From f045d2d4188659b1a86e737c24e4ae7a1db9fd54 Mon Sep 17 00:00:00 2001 From: Bojan Date: Wed, 10 Apr 2019 22:00:01 -0300 Subject: [PATCH] start work on potential support for multiple connections --- cmd/ghz/main.go | 2 + runner/call_template_data.go | 4 +- runner/options.go | 3 + runner/requester.go | 279 ++++++++--------------------------- runner/worker.go | 233 +++++++++++++++++++++++++++++ 5 files changed, 301 insertions(+), 220 deletions(-) create mode 100644 runner/worker.go diff --git a/cmd/ghz/main.go b/cmd/ghz/main.go index a4e85381..ebed801f 100644 --- a/cmd/ghz/main.go +++ b/cmd/ghz/main.go @@ -45,6 +45,8 @@ var ( z = kingpin.Flag("duration", "Duration of application to send requests. When duration is reached, application stops and exits. If duration is specified, n is ignored. Examples: -z 10s -z 3m.").Short('z').Default("0").Duration() x = kingpin.Flag("max-duration", "Maximum duration of application to send requests with n setting respected. If duration is reached before n requests are completed, application stops and exits. Examples: -x 10s -x 3m.").Short('x').Default("0").Duration() + conns = kingpin.Flag("connections", "Number of connections to use. Concurrency is distributed evenly to all the connectios. Default is 1.").Default("1").Uint() + data = kingpin.Flag("data", "The call data as stringified JSON. If the value is '@' then the request contents are read from stdin.").Short('d').PlaceHolder(" ").String() dataPath = kingpin.Flag("data-file", "File path for call data JSON file. Examples: /home/user/file.json or ./file.json.").Short('D').PlaceHolder("PATH").PlaceHolder(" ").String() binData = kingpin.Flag("binary", "The call data comes as serialized binary message read from stdin.").Short('b').Default("false").Bool() diff --git a/runner/call_template_data.go b/runner/call_template_data.go index 7c25aaec..a7c37317 100644 --- a/runner/call_template_data.go +++ b/runner/call_template_data.go @@ -11,6 +11,7 @@ import ( // call template data type callTemplateData struct { + WorkerID string // the unique worker ID / number RequestNumber int64 // unique incrememnted request number for each request FullyQualifiedName string // fully-qualified name of the method call MethodName string // shorter call method name @@ -24,10 +25,11 @@ type callTemplateData struct { } // newCallTemplateData returns new call template data -func newCallTemplateData(mtd *desc.MethodDescriptor, reqNum int64) *callTemplateData { +func newCallTemplateData(mtd *desc.MethodDescriptor, workerID string, reqNum int64) *callTemplateData { now := time.Now() return &callTemplateData{ + WorkerID: workerID, RequestNumber: reqNum, FullyQualifiedName: mtd.GetFullyQualifiedName(), MethodName: mtd.GetName(), diff --git a/runner/options.go b/runner/options.go index bd33769e..b2068456 100644 --- a/runner/options.go +++ b/runner/options.go @@ -40,6 +40,9 @@ type RunConfig struct { c int qps int + // number of connections + nConns int + // timeouts z time.Duration timeout time.Duration diff --git a/runner/requester.go b/runner/requester.go index 9838713d..a60c70dc 100644 --- a/runner/requester.go +++ b/runner/requester.go @@ -3,9 +3,8 @@ package runner import ( "context" "fmt" - "io" + "strconv" "sync" - "sync/atomic" "time" "github.com/bojand/ghz/protodesc" @@ -34,8 +33,9 @@ type callResult struct { // Requester is used for doing the requests type Requester struct { - cc *grpc.ClientConn - stub grpcdynamic.Stub + cc []*grpc.ClientConn + stubs []grpcdynamic.Stub + mtd *desc.MethodDescriptor reporter *Reporter @@ -67,6 +67,13 @@ func newRequester(c *RunConfig) (*Requester, error) { stopReason: ReasonNormalEnd, results: make(chan *callResult, min(c.c*1000, maxResult)), stopCh: make(chan bool, c.c), + cc: make([]*grpc.ClientConn, 0, c.nConns), + stubs: make([]grpcdynamic.Stub, 0, c.nConns), + } + + // TODO REMOVE + if reqr.config.nConns <= 0 { + reqr.config.nConns = 5 } if c.proto != "" { @@ -125,14 +132,19 @@ func newRequester(c *RunConfig) (*Requester, error) { func (b *Requester) Run() (*Report, error) { start := time.Now() - cc, err := b.openClientConn() + cc, err := b.openClientConns() if err != nil { return nil, err } b.lock.Lock() b.start = start - b.stub = grpcdynamic.NewStub(cc) + + for n := 0; n < b.config.nConns; n++ { + stub := grpcdynamic.NewStub(cc[n]) + b.stubs = append(b.stubs, stub) + } + b.reporter = newReporter(b.results, b.config) b.lock.Unlock() @@ -143,7 +155,7 @@ func (b *Requester) Run() (*Report, error) { err = b.runWorkers() report := b.Finish() - b.closeClientConn() + b.closeClientConns() return report, err } @@ -159,7 +171,7 @@ func (b *Requester) Stop(reason StopReason) { b.stopReason = reason b.lock.Unlock() - b.closeClientConn() + b.closeClientConns() } // Finish finishes the test run @@ -173,27 +185,37 @@ func (b *Requester) Finish() *Report { return b.reporter.Finalize(b.stopReason, total) } -func (b *Requester) openClientConn() (*grpc.ClientConn, error) { +func (b *Requester) openClientConns() ([]*grpc.ClientConn, error) { b.lock.Lock() defer b.lock.Unlock() - if b.cc != nil { + + if len(b.cc) == b.config.nConns { return b.cc, nil } - cc, err := b.newClientConn(true) - if err != nil { - return nil, err + + for n := 0; n < b.config.nConns; n++ { + c, err := b.newClientConn(true) + if err != nil { + return nil, err + } + + b.cc = append(b.cc, c) } - b.cc = cc + return b.cc, nil } -func (b *Requester) closeClientConn() { +func (b *Requester) closeClientConns() { b.lock.Lock() defer b.lock.Unlock() if b.cc == nil { return } - _ = b.cc.Close() + + for _, cc := range b.cc { + _ = cc.Close() + } + b.cc = nil } @@ -238,223 +260,42 @@ func (b *Requester) runWorkers() error { } errC := make(chan error, b.config.c) + + n := 0 // connection counter // Ignore the case where b.N % b.C != 0. for i := 0; i < b.config.c; i++ { - go func() { - errC <- b.runWorker(nReqPerWorker) - }() - } - var err error - for i := 0; i < b.config.c; i++ { - err = multierr.Append(err, <-errC) - } - return err -} - -func (b *Requester) runWorker(n int) error { - var throttle <-chan time.Time - if b.config.qps > 0 { - throttle = time.Tick(b.qpsTick) - } - - var err error - for i := 0; i < n; i++ { - // Check if application is stopped. Do not send into a closed channel. - select { - case <-b.stopCh: - return nil - default: - if b.config.qps > 0 { - <-throttle - } - err = multierr.Append(err, b.makeRequest()) + wID := "g" + strconv.Itoa(i) + "c" + strconv.Itoa(n) + w := Worker{ + stub: b.stubs[n], + mtd: b.mtd, + config: b.config, + stopCh: b.stopCh, + qpsTick: b.qpsTick, + reqCounter: &b.reqCounter, + nReq: nReqPerWorker, + workerID: wID, } - } - return err -} - -func (b *Requester) makeRequest() error { - - reqNum := atomic.AddInt64(&b.reqCounter, 1) - ctd := newCallTemplateData(b.mtd, reqNum) + n++ // increment connection counter - var input *dynamic.Message - var streamInput *[]*dynamic.Message - - if !b.config.binary { - data, err := ctd.executeData(string(b.config.data)) - if err != nil { - return err - } - input, streamInput, err = createPayloads(string(data), b.mtd) - if err != nil { - return err - } - } else { - var err error - input, streamInput, err = createPayloadsFromBin(b.config.data, b.mtd) - if err != nil { - return err + // wrap around if needed + if n == b.config.nConns { + n = 0 } - } - mdMap, err := ctd.executeMetadata(string(b.config.metadata)) - if err != nil { - return err - } - - var reqMD *metadata.MD - if mdMap != nil && len(*mdMap) > 0 { - md := metadata.New(*mdMap) - reqMD = &md - } - - ctx := context.Background() - var cancel context.CancelFunc - - if b.config.timeout > 0 { - ctx, cancel = context.WithTimeout(ctx, b.config.timeout) - } else { - ctx, cancel = context.WithCancel(ctx) - } - defer cancel() - - // include the metadata - if reqMD != nil { - ctx = metadata.NewOutgoingContext(ctx, *reqMD) + go func() { + errC <- w.runWorker() + }() } - // RPC errors are handled via stats handler - - if b.mtd.IsClientStreaming() && b.mtd.IsServerStreaming() { - _ = b.makeBidiRequest(&ctx, streamInput) - } - if b.mtd.IsClientStreaming() { - _ = b.makeClientStreamingRequest(&ctx, streamInput) - } - if b.mtd.IsServerStreaming() { - _ = b.makeServerStreamingRequest(&ctx, input) + var err error + for i := 0; i < b.config.c; i++ { + err = multierr.Append(err, <-errC) } - - // TODO: handle response? - _, _ = b.stub.InvokeRpc(ctx, b.mtd, input) return err } -func (b *Requester) makeClientStreamingRequest(ctx *context.Context, input *[]*dynamic.Message) error { - str, err := b.stub.InvokeRpcClientStream(*ctx, b.mtd) - counter := 0 - // TODO: need to handle and propagate errors - for err == nil { - streamInput := *input - inputLen := len(streamInput) - if input == nil || inputLen == 0 { - // TODO: need to handle error - _, _ = str.CloseAndReceive() - break - } - - if counter == inputLen { - // TODO: need to handle error - _, _ = str.CloseAndReceive() - break - } - - payload := streamInput[counter] - - var wait <-chan time.Time - if b.config.streamInterval > 0 { - wait = time.Tick(b.config.streamInterval) - <-wait - } - - err = str.SendMsg(payload) - if err == io.EOF { - // We get EOF on send if the server says "go away" - // We have to use CloseAndReceive to get the actual code - // TODO: need to handle error - _, _ = str.CloseAndReceive() - break - } - counter++ - } - return nil -} - -func (b *Requester) makeServerStreamingRequest(ctx *context.Context, input *dynamic.Message) error { - str, err := b.stub.InvokeRpcServerStream(*ctx, b.mtd, input) - // TODO: need to handle and propagate errors - for err == nil { - _, err := str.RecvMsg() - if err != nil { - if err == io.EOF { - err = nil - } - break - } - } - return nil -} - -func (b *Requester) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Message) error { - str, err := b.stub.InvokeRpcBidiStream(*ctx, b.mtd) - if err != nil { - return err - } - - counter := 0 - - streamInput := *input - inputLen := len(streamInput) - - recvDone := make(chan bool) - - if input == nil || inputLen == 0 { - // TODO: need to handle error - _ = str.CloseSend() - return nil - } - - go func() { - for { - _, err := str.RecvMsg() - - if err != nil { - close(recvDone) - break - } - } - }() - - // TODO: need to handle and propagate errors - for err == nil { - if counter == inputLen { - // TODO: need to handle error - _ = str.CloseSend() - break - } - - payload := streamInput[counter] - - var wait <-chan time.Time - if b.config.streamInterval > 0 { - wait = time.Tick(b.config.streamInterval) - <-wait - } - - err = str.SendMsg(payload) - counter++ - } - - if err == nil { - <-recvDone - } - - return nil -} - func min(a, b int) int { if a < b { return a diff --git a/runner/worker.go b/runner/worker.go new file mode 100644 index 00000000..8b9cf81a --- /dev/null +++ b/runner/worker.go @@ -0,0 +1,233 @@ +package runner + +import ( + "context" + "io" + "sync/atomic" + "time" + + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" + "github.com/jhump/protoreflect/dynamic/grpcdynamic" + "go.uber.org/multierr" + "google.golang.org/grpc/metadata" +) + +// Worker is used for doing a single stream of requests in parallerl +type Worker struct { + stub grpcdynamic.Stub + mtd *desc.MethodDescriptor + + config *RunConfig + stopCh chan bool + qpsTick time.Duration + reqCounter *int64 + nReq int + workerID string +} + +func (w *Worker) runWorker() error { + var throttle <-chan time.Time + if w.config.qps > 0 { + throttle = time.Tick(w.qpsTick) + } + + var err error + for i := 0; i < w.nReq; i++ { + // Check if application is stopped. Do not send into a closed channel. + select { + case <-w.stopCh: + return nil + default: + if w.config.qps > 0 { + <-throttle + } + + rErr := w.makeRequest() + + err = multierr.Append(err, rErr) + } + } + return err +} + +func (w *Worker) makeRequest() error { + + reqNum := atomic.AddInt64(w.reqCounter, 1) + + ctd := newCallTemplateData(w.mtd, w.workerID, reqNum) + + var input *dynamic.Message + var streamInput *[]*dynamic.Message + + if !w.config.binary { + data, err := ctd.executeData(string(w.config.data)) + if err != nil { + return err + } + input, streamInput, err = createPayloads(string(data), w.mtd) + if err != nil { + return err + } + } else { + var err error + input, streamInput, err = createPayloadsFromBin(w.config.data, w.mtd) + if err != nil { + return err + } + } + + mdMap, err := ctd.executeMetadata(string(w.config.metadata)) + if err != nil { + return err + } + + var reqMD *metadata.MD + if mdMap != nil && len(*mdMap) > 0 { + md := metadata.New(*mdMap) + reqMD = &md + } + + ctx := context.Background() + var cancel context.CancelFunc + + if w.config.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, w.config.timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + // include the metadata + if reqMD != nil { + ctx = metadata.NewOutgoingContext(ctx, *reqMD) + } + + // RPC errors are handled via stats handler + + if w.mtd.IsClientStreaming() && w.mtd.IsServerStreaming() { + _ = w.makeBidiRequest(&ctx, streamInput) + } + if w.mtd.IsClientStreaming() { + _ = w.makeClientStreamingRequest(&ctx, streamInput) + } + if w.mtd.IsServerStreaming() { + _ = w.makeServerStreamingRequest(&ctx, input) + } + + // TODO: handle response? + _, _ = w.stub.InvokeRpc(ctx, w.mtd, input) + return err +} + +func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input *[]*dynamic.Message) error { + str, err := w.stub.InvokeRpcClientStream(*ctx, w.mtd) + counter := 0 + // TODO: need to handle and propagate errors + for err == nil { + streamInput := *input + inputLen := len(streamInput) + if input == nil || inputLen == 0 { + // TODO: need to handle error + _, _ = str.CloseAndReceive() + break + } + + if counter == inputLen { + // TODO: need to handle error + _, _ = str.CloseAndReceive() + break + } + + payload := streamInput[counter] + + var wait <-chan time.Time + if w.config.streamInterval > 0 { + wait = time.Tick(w.config.streamInterval) + <-wait + } + + err = str.SendMsg(payload) + if err == io.EOF { + // We get EOF on send if the server says "go away" + // We have to use CloseAndReceive to get the actual code + // TODO: need to handle error + _, _ = str.CloseAndReceive() + break + } + counter++ + } + return nil +} + +func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic.Message) error { + str, err := w.stub.InvokeRpcServerStream(*ctx, w.mtd, input) + // TODO: need to handle and propagate errors + for err == nil { + _, err := str.RecvMsg() + if err != nil { + if err == io.EOF { + err = nil + } + break + } + } + return nil +} + +func (w *Worker) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Message) error { + str, err := w.stub.InvokeRpcBidiStream(*ctx, w.mtd) + if err != nil { + return err + } + + counter := 0 + + streamInput := *input + inputLen := len(streamInput) + + recvDone := make(chan bool) + + if input == nil || inputLen == 0 { + // TODO: need to handle error + _ = str.CloseSend() + return nil + } + + go func() { + for { + _, err := str.RecvMsg() + + if err != nil { + close(recvDone) + break + } + } + }() + + // TODO: need to handle and propagate errors + for err == nil { + if counter == inputLen { + // TODO: need to handle error + _ = str.CloseSend() + break + } + + payload := streamInput[counter] + + var wait <-chan time.Time + if w.config.streamInterval > 0 { + wait = time.Tick(w.config.streamInterval) + <-wait + } + + err = str.SendMsg(payload) + counter++ + } + + if err == nil { + <-recvDone + } + + return nil +}