Skip to content

Commit

Permalink
Json optimize (#91)
Browse files Browse the repository at this point in the history
* Optimize JSON array data for non client-streaming calls

* Check for client streaming in worker makeRequest()

* Code cleanup

* Add prerelease flag to goreleaser file

* Remove test data file that got added by accident

* Fix goreleaser archive deprecation
  • Loading branch information
bojand authored May 13, 2019
1 parent d763723 commit 5020ef6
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 80 deletions.
15 changes: 9 additions & 6 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ builds:
- windows
goarch:
- amd64
archive:
replacements:
darwin: Darwin
linux: Linux
windows: Windows
amd64: x86_64
archives:
- id: ghz
replacements:
darwin: Darwin
linux: Linux
windows: Windows
amd64: x86_64
checksum:
name_template: 'checksums.txt'
snapshot:
name_template: "{{ .Tag }}-next"
release:
prerelease: auto
changelog:
sort: asc
filters:
Expand Down
30 changes: 15 additions & 15 deletions runner/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func messageFromMap(input *dynamic.Message, data *map[string]interface{}) error
return nil
}

func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
md := mtd.GetInputType()
var inputs []*dynamic.Message

Expand Down Expand Up @@ -65,16 +65,16 @@ func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) (*[]*dynami
}
}

return &inputs, nil
return inputs, nil
}

func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
inputs := make([]*dynamic.Message, 0, 1)
md := mtd.GetInputType()

// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
return inputs, nil
}

// try to unmarshal input as a single message
Expand All @@ -83,19 +83,19 @@ func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescript
if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}
inputs = make([]*dynamic.Message, 1)
inputs[0] = singleMessage

return &inputs, nil
inputs = append(inputs, singleMessage)

return inputs, nil
}

func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
inputs := make([]*dynamic.Message, 0)
md := mtd.GetInputType()

// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
return inputs, nil
}

// try to unmarshal input as several count-delimited messages
Expand All @@ -115,15 +115,15 @@ func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescrip
inputs = append(inputs, msg)
}

return &inputs, nil
return inputs, nil
}

func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
inputs, err := createPayloadsFromBinCountDelimited(binData, mtd)

if err == nil && len(*inputs) > 0 {
if err == nil && len(inputs) > 0 {
return inputs, err
}

return createPayloadsFromBinSingleMessage(binData, mtd)
}
}
40 changes: 20 additions & 20 deletions runner/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single element from map for client streaming", func(t *testing.T) {
Expand All @@ -84,8 +84,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice of messages from slice for client streaming", func(t *testing.T) {
Expand All @@ -102,7 +102,7 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
assert.Len(t, inputs, 2)
})

t.Run("fail on invalid shape of data in slice for client streaming", func(t *testing.T) {
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 3)
assert.Len(t, inputs, 3)
})

t.Run("create slice with single object from map for unary with camelCase property", func(t *testing.T) {
Expand All @@ -154,8 +154,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single object from map for unary with snake_case property", func(t *testing.T) {
Expand All @@ -167,8 +167,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single object from map for unary with nested camelCase property", func(t *testing.T) {
Expand All @@ -183,8 +183,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single object from map for unary with nested snake_case property", func(t *testing.T) {
Expand All @@ -199,8 +199,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice from single message binary data", func(t *testing.T) {
Expand All @@ -213,8 +213,8 @@ func TestData_createPayloads(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.EqualValues(t, msg1.GetName(), (*inputs)[0].GetFieldByName("name"))
assert.Len(t, inputs, 1)
assert.EqualValues(t, msg1.GetName(), inputs[0].GetFieldByName("name"))
})

t.Run("create slice from count-delimited binary data", func(t *testing.T) {
Expand All @@ -231,9 +231,9 @@ func TestData_createPayloads(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
assert.EqualValues(t, msg1.GetName(), (*inputs)[0].GetFieldByName("name"))
assert.EqualValues(t, msg2.GetName(), (*inputs)[1].GetFieldByName("name"))
assert.Len(t, inputs, 2)
assert.EqualValues(t, msg1.GetName(), inputs[0].GetFieldByName("name"))
assert.EqualValues(t, msg2.GetName(), inputs[1].GetFieldByName("name"))
})

t.Run("on empty binary data returns empty slice", func(t *testing.T) {
Expand All @@ -242,6 +242,6 @@ func TestData_createPayloads(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 0)
assert.Len(t, inputs, 0)
})
}
22 changes: 10 additions & 12 deletions runner/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Report

rep.Fastest = time.Duration(fastestNum * float64(time.Second))
rep.Slowest = time.Duration(slowestNum * float64(time.Second))
rep.Histogram = histogram(&lats, slowestNum, fastestNum)
rep.LatencyDistribution = latencies(&lats)
rep.Histogram = histogram(lats, slowestNum, fastestNum)
rep.LatencyDistribution = latencies(lats)

rep.Details = make([]ResultDetail, len(r.lats))
for i, num := range r.lats {
Expand All @@ -240,15 +240,14 @@ func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Report
return rep
}

func latencies(latencies *[]float64) []LatencyDistribution {
lats := *latencies
func latencies(latencies []float64) []LatencyDistribution {
pctls := []int{10, 25, 50, 75, 90, 95, 99}
data := make([]float64, len(pctls))
j := 0
for i := 0; i < len(lats) && j < len(pctls); i++ {
current := i * 100 / len(lats)
for i := 0; i < len(latencies) && j < len(pctls); i++ {
current := i * 100 / len(latencies)
if current >= pctls[j] {
data[j] = lats[i]
data[j] = latencies[i]
j++
}
}
Expand All @@ -262,8 +261,7 @@ func latencies(latencies *[]float64) []LatencyDistribution {
return res
}

func histogram(latencies *[]float64, slowest, fastest float64) []Bucket {
lats := *latencies
func histogram(latencies []float64, slowest, fastest float64) []Bucket {
bc := 10
buckets := make([]float64, bc+1)
counts := make([]int, bc+1)
Expand All @@ -274,8 +272,8 @@ func histogram(latencies *[]float64, slowest, fastest float64) []Bucket {
buckets[bc] = slowest
var bi int
var max int
for i := 0; i < len(lats); {
if lats[i] <= buckets[bi] {
for i := 0; i < len(latencies); {
if latencies[i] <= buckets[bi] {
i++
counts[bi]++
if max < counts[bi] {
Expand All @@ -290,7 +288,7 @@ func histogram(latencies *[]float64, slowest, fastest float64) []Bucket {
res[i] = Bucket{
Mark: buckets[i],
Count: counts[i],
Frequency: float64(counts[i]) / float64(len(lats)),
Frequency: float64(counts[i]) / float64(len(latencies)),
}
}
return res
Expand Down
42 changes: 34 additions & 8 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package runner

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -49,6 +51,8 @@ type Requester struct {

reqCounter int64

arrayJSONData []string

stopReason StopReason
lock sync.Mutex
}
Expand Down Expand Up @@ -120,6 +124,27 @@ func newRequester(c *RunConfig) (*Requester, error) {
// fill in the rest
reqr.mtd = mtd

// fill in JSON string array data for optimization for non client-streaming
reqr.arrayJSONData = nil
if !c.binary && !reqr.mtd.IsClientStreaming() {
if strings.IndexRune(string(c.data), '[') == 0 { // it's an array
var dat []map[string]interface{}
if err := json.Unmarshal(c.data, &dat); err != nil {
return nil, err
}

reqr.arrayJSONData = make([]string, len(dat))
for i, d := range dat {
var strd []byte
if strd, err = json.Marshal(d); err != nil {
return nil, err
}

reqr.arrayJSONData[i] = string(strd)
}
}
}

return reqr, nil
}

Expand Down Expand Up @@ -270,14 +295,15 @@ func (b *Requester) runWorkers() error {
}

w := Worker{
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
stopCh: b.stopCh,
qpsTick: b.qpsTick,
reqCounter: &b.reqCounter,
nReq: nReqPerWorker,
workerID: wID,
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
stopCh: b.stopCh,
qpsTick: b.qpsTick,
reqCounter: &b.reqCounter,
nReq: nReqPerWorker,
workerID: wID,
arrayJSONData: b.arrayJSONData,
}

n++ // increment connection counter
Expand Down
Loading

0 comments on commit 5020ef6

Please sign in to comment.