diff --git a/sdk/data/aztables/testdata/perf/batch.go b/sdk/data/aztables/testdata/perf/batch.go new file mode 100644 index 000000000000..09522e9452b8 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/batch.go @@ -0,0 +1,173 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +type batchTestOptions struct { + fullEDM bool + clientSharing bool + count int +} + +var batchTestOpts batchTestOptions = batchTestOptions{ + fullEDM: false, + clientSharing: false, + count: 100, +} + +// batchTestRegister is called once per process +func batchTestRegister() { + flag.IntVar(&listTestOpts.count, "count", 100, "Number of entities to batch create") + flag.IntVar(&listTestOpts.count, "c", 100, "Number of entities to batch create") + flag.BoolVar(&batchTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&batchTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") +} + +type batchTestGlobal struct { + perf.PerfTestOptions + tableName string +} + +// NewBatchTest is called once per process +func NewBatchTest(ctx context.Context, options perf.PerfTestOptions) (perf.GlobalPerfTest, error) { + guid, err := uuid.New() + if err != nil { + return nil, err + } + tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) + d := &batchTestGlobal{ + PerfTestOptions: options, + tableName: tableName, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return nil, err + } + _, err = svcClient.CreateTable(context.Background(), d.tableName, nil) + if err != nil { + return nil, err + } + + return d, nil +} + +func (d *batchTestGlobal) GlobalCleanup(ctx context.Context) error { + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return err + } + + _, err = svcClient.DeleteTable(context.Background(), d.tableName, nil) + return err +} + +type batchEntityPerfTest struct { + *batchTestGlobal + perf.PerfTestOptions + baseEDMEntity aztables.EDMEntity + baseStringEntity map[string]string + tableClient *aztables.Client +} + +// NewPerfTest is called once per goroutine +func (g *batchTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &batchEntityPerfTest{ + batchTestGlobal: g, + PerfTestOptions: *options, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, &aztables.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: d.PerfTestOptions.Transporter, + }, + }) + if err != nil { + return nil, err + } + + d.tableClient = svcClient.NewClient(g.tableName) + + pk, err := uuid.New() + if err != nil { + return nil, err + } + + stringEntity["PartitionKey"] = pk.String() + + d.baseStringEntity = stringEntity + + edmEntity := fullEdm + edmEntity.PartitionKey = pk.String() + d.baseEDMEntity = edmEntity + + return d, nil +} + +func (d *batchEntityPerfTest) Run(ctx context.Context) error { + batch := make([]aztables.TransactionAction, batchTestOpts.count) + + for i := 0; i < batchTestOpts.count; i++ { + + if batchTestOpts.fullEDM { + d.baseEDMEntity.RowKey = fmt.Sprint(i) + marshalled, err := json.Marshal(d.baseEDMEntity) + if err != nil { + return err + } + + batch[i] = aztables.TransactionAction{ + Entity: marshalled, + ActionType: aztables.TransactionTypeUpdateMerge, + } + } else { + d.baseStringEntity["RowKey"] = fmt.Sprint(i) + marshalled, err := json.Marshal(d.baseStringEntity) + if err != nil { + return err + } + + batch[i] = aztables.TransactionAction{ + Entity: marshalled, + ActionType: aztables.TransactionTypeUpdateMerge, + } + } + + } + + _, err := d.tableClient.SubmitTransaction(ctx, batch, nil) + return err +} + +func (*batchEntityPerfTest) Cleanup(ctx context.Context) error { + return nil +} diff --git a/sdk/data/aztables/testdata/perf/go.mod b/sdk/data/aztables/testdata/perf/go.mod new file mode 100644 index 000000000000..c9867fb8e9ad --- /dev/null +++ b/sdk/data/aztables/testdata/perf/go.mod @@ -0,0 +1,20 @@ +module github.com/Azure/azure-sdk-for-go/sdk/data/aztables/testdata/perf + +go 1.17 + +replace github.com/Azure/azure-sdk-for-go/sdk/internal => ../../../../internal + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 + github.com/Azure/azure-sdk-for-go/sdk/data/aztables v0.6.0 + github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.7.0 // indirect + golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect + golang.org/x/text v0.3.7 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) diff --git a/sdk/data/aztables/testdata/perf/go.sum b/sdk/data/aztables/testdata/perf/go.sum new file mode 100644 index 000000000000..b9d57f12bc59 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/go.sum @@ -0,0 +1,60 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 h1:8wVJL0HUP5yDFXvotdewORTw7Yu88JbreWN/mobSvsQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0 h1:bLRntPH25SkY1uZ/YZW+dmxNky9r1fAHvDFrzluo+4Q= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0/go.mod h1:TmXReXZ9yPp5D5TBRMTAtyz+UyOl15Py4hL5E5p6igQ= +github.com/Azure/azure-sdk-for-go/sdk/data/aztables v0.6.0 h1:aSPOq3mqbWTXPSQhXAwgsJas4ZdyapBn+uWA54HZRto= +github.com/Azure/azure-sdk-for-go/sdk/data/aztables v0.6.0/go.mod h1:fRf7GSd+2fcFo7pa3QndmE29N9ndRxJK4TosS72TpdI= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= +github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= +github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sdk/data/aztables/testdata/perf/insert_entity.go b/sdk/data/aztables/testdata/perf/insert_entity.go new file mode 100644 index 000000000000..86594990b047 --- /dev/null +++ b/sdk/data/aztables/testdata/perf/insert_entity.go @@ -0,0 +1,175 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +var stringEntity = map[string]string{ + "PartitionKey": "", + "RowKey": "", + "StringTypeProperty1": "StringTypeProperty", + "StringTypeProperty2": "1970-10-04T00:00:00+00:00", + "StringTypeProperty3": "c9da6455-213d-42c9-9a79-3e9149a57833", + "StringTypeProperty4": "BinaryTypeProperty", + "StringTypeProperty5": fmt.Sprint(2 ^ 32 + 1), + "StringTypeProperty6": "200.23", + "StringTypeProperty7": "5", +} + +var fullEdm = aztables.EDMEntity{ + Entity: aztables.Entity{ + PartitionKey: "", + RowKey: "", + }, + Properties: map[string]interface{}{ + "StringTypeProperty": "StringTypeProperty", + "DatetimeTypeProperty": aztables.EDMDateTime(time.Now()), + "GuidTypeProperty": aztables.EDMGUID("c9da6455-213d-42c9-9a79-3e9149a57833"), + "BinaryTypeProperty": aztables.EDMBinary([]byte("BinaryTypeProperty")), + "Int64TypeProperty": aztables.EDMInt64(2 ^ 32 + 1), + "DoubleTypeProperty": 200.23, + "IntTypeProperty": 5, + }, +} + +type insertEntityTestOptions struct { + fullEDM bool + clientSharing bool +} + +var insertTestOpts insertEntityTestOptions = insertEntityTestOptions{ + fullEDM: false, + clientSharing: false, +} + +// insertTestRegister is called once per process +func insertTestRegister() { + flag.BoolVar(&insertTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&insertTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") +} + +type insertEntityTestGlobal struct { + perf.PerfTestOptions + tableName string +} + +// NewInsertEntityTest is called once per process +func NewInsertEntityTest(ctx context.Context, options perf.PerfTestOptions) (perf.GlobalPerfTest, error) { + guid, err := uuid.New() + if err != nil { + return nil, err + } + tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) + d := &insertEntityTestGlobal{ + PerfTestOptions: options, + tableName: tableName, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return nil, err + } + _, err = svcClient.CreateTable(context.Background(), d.tableName, nil) + if err != nil { + return nil, err + } + + return d, nil +} + +func (d *insertEntityTestGlobal) GlobalCleanup(ctx context.Context) error { + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return err + } + + _, err = svcClient.DeleteTable(context.Background(), d.tableName, nil) + return err +} + +type insertEntityPerfTest struct { + *insertEntityTestGlobal + perf.PerfTestOptions + entity []byte + tableClient *aztables.Client +} + +// NewPerfTest is called once per goroutine +func (g *insertEntityTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &insertEntityPerfTest{ + insertEntityTestGlobal: g, + PerfTestOptions: *options, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, &aztables.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: d.PerfTestOptions.Transporter, + }, + }) + if err != nil { + return nil, err + } + + d.tableClient = svcClient.NewClient(g.tableName) + + rk, err := uuid.New() + if err != nil { + return nil, err + } + pk, err := uuid.New() + if err != nil { + return nil, err + } + + stringEntity["PartitionKey"] = pk.String() + stringEntity["RowKey"] = rk.String() + + bytes, err := json.Marshal(stringEntity) + if err != nil { + return nil, err + } + + d.entity = bytes + + return d, nil +} + +func (d *insertEntityPerfTest) Run(ctx context.Context) error { + _, err := d.tableClient.InsertEntity(ctx, d.entity, &aztables.InsertEntityOptions{ + UpdateMode: aztables.EntityUpdateModeMerge, + }) + return err +} + +func (*insertEntityPerfTest) Cleanup(ctx context.Context) error { + return nil +} diff --git a/sdk/data/aztables/testdata/perf/list_entities.go b/sdk/data/aztables/testdata/perf/list_entities.go new file mode 100644 index 000000000000..1344d601912b --- /dev/null +++ b/sdk/data/aztables/testdata/perf/list_entities.go @@ -0,0 +1,186 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +type listEntitiesTestOptions struct { + fullEDM bool + clientSharing bool + count int +} + +var listTestOpts listEntitiesTestOptions = listEntitiesTestOptions{ + fullEDM: false, + clientSharing: false, + count: 100, +} + +// listTestRegister is called once per process +func listTestRegister() { + flag.IntVar(&listTestOpts.count, "count", 100, "Number of entities to list") + flag.IntVar(&listTestOpts.count, "c", 100, "Number of entities to list") + flag.BoolVar(&listTestOpts.fullEDM, "full-edm", false, "whether to use entities that utiliza all EDM types for serialization/deserialization, or only strings. Default is only strings") + flag.BoolVar(&listTestOpts.clientSharing, "no-client-share", false, "create one ServiceClient per test instance. Default is to share a single ServiceClient") +} + +type listEntityTestGlobal struct { + perf.PerfTestOptions + tableName string + svcClient *aztables.ServiceClient +} + +// NewListEntitiesTest is called once per process +func NewListEntitiesTest(ctx context.Context, options perf.PerfTestOptions) (perf.GlobalPerfTest, error) { + guid, err := uuid.New() + if err != nil { + return nil, err + } + tableName := fmt.Sprintf("table%s", strings.ReplaceAll(guid.String(), "-", "")) + d := &listEntityTestGlobal{ + PerfTestOptions: options, + tableName: tableName, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return nil, err + } + _, err = svcClient.CreateTable(context.Background(), d.tableName, nil) + if err != nil { + return nil, err + } + d.svcClient = svcClient + + client := d.svcClient.NewClient(d.tableName) + + baseEntityEDM := fullEdm + baseEntityString := stringEntity + + u, err := uuid.New() + if err != nil { + return nil, err + } + + baseEntityEDM.PartitionKey = u.String() + baseEntityString["PartitionKey"] = u.String() + + for i := 0; i < listTestOpts.count; i++ { + if listTestOpts.fullEDM { + u, err := uuid.New() + if err != nil { + return nil, err + } + baseEntityEDM.RowKey = u.String() + + marshalled, err := json.Marshal(baseEntityEDM) + if err != nil { + return nil, err + } + + _, err = client.InsertEntity(ctx, marshalled, nil) + if err != nil { + return nil, err + } + } else { + u, err := uuid.New() + if err != nil { + return nil, err + } + baseEntityString["RowKey"] = u.String() + + marshalled, err := json.Marshal(baseEntityString) + if err != nil { + return nil, err + } + + _, err = client.InsertEntity(ctx, marshalled, nil) + if err != nil { + return nil, err + } + } + } + + return d, nil +} + +func (d *listEntityTestGlobal) GlobalCleanup(ctx context.Context) error { + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, nil) + if err != nil { + return err + } + + _, err = svcClient.DeleteTable(context.Background(), d.tableName, nil) + return err +} + +type listEntitiesPerfTest struct { + *listEntityTestGlobal + perf.PerfTestOptions + client *aztables.Client +} + +// NewPerfTest is called once per goroutine +func (g *listEntityTestGlobal) NewPerfTest(ctx context.Context, options *perf.PerfTestOptions) (perf.PerfTest, error) { + d := &listEntitiesPerfTest{ + listEntityTestGlobal: g, + PerfTestOptions: *options, + } + + connStr, ok := os.LookupEnv("AZURE_TABLES_CONNECTION_STRING") + if !ok { + return nil, fmt.Errorf("the environment variable 'AZURE_TABLES_CONNECTION_STRING' could not be found") + } + + svcClient, err := aztables.NewServiceClientFromConnectionString(connStr, &aztables.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: d.PerfTestOptions.Transporter, + }, + }) + if err != nil { + return nil, err + } + + d.client = svcClient.NewClient(g.tableName) + + return d, nil +} + +func (d *listEntitiesPerfTest) Run(ctx context.Context) error { + pager := d.client.List(nil) + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + return err + } + _ = resp + } + return nil +} + +func (*listEntitiesPerfTest) Cleanup(ctx context.Context) error { + return nil +} diff --git a/sdk/data/aztables/testdata/perf/main.go b/sdk/data/aztables/testdata/perf/main.go new file mode 100644 index 000000000000..72eea986c93b --- /dev/null +++ b/sdk/data/aztables/testdata/perf/main.go @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package main + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/internal/perf" +) + +func main() { + perf.Run(map[string]perf.PerfMethods{ + "CreateEntityTest": {Register: insertTestRegister, New: NewInsertEntityTest}, + "ListEntitiesTest": {Register: listTestRegister, New: NewListEntitiesTest}, + "CreateEntityBatchTest": {Register: batchTestRegister, New: NewBatchTest}, + }) +} diff --git a/sdk/internal/perf/README.md b/sdk/internal/perf/README.md index 08cbcab992af..cb319517f25e 100644 --- a/sdk/internal/perf/README.md +++ b/sdk/internal/perf/README.md @@ -3,11 +3,11 @@ The `perf` sub-module provides a singular framework for writing performance test ## Default Command Options -| Flag | Short Flag | Default Value | Variable Name | Description | -| -----| ---------- | ------------- | ------------- | ----------- | -| `--duration` | `-d` | 10 seconds | internal.Duration (`int`) | How long to run an individual performance test | -| `--test-proxies` | `-x` | N/A | internal.TestProxy (`string`) | Whether to run a test against a test proxy. If you want to run against `https` specify with `--test-proxies https`, likewise for `http`. If you want to run normally omit this flag | -| `--warmup` | `-w` | 3 seconds| internal.WarmUp (`int`) | How long to allow the connection to warm up. | +| Flag | Short Flag | Default Value | Description | +| -----| ---------- | ------------- | ----------- | +| `--duration` | `-d` | 10 seconds | How long to run an individual performance test | +| `--test-proxies` | `-x` | N/A | A semicolon separated list of proxy urls. If you want to run normally omit this flag | +| `--warmup` | `-w` | 3 seconds| How long to allow the connection to warm up. | ## Adding Performance Tests to an SDK diff --git a/sdk/internal/perf/implementation.go b/sdk/internal/perf/implementation.go index 92f2bf6a30ae..225b5d207f6c 100644 --- a/sdk/internal/perf/implementation.go +++ b/sdk/internal/perf/implementation.go @@ -4,295 +4,36 @@ package perf import ( - "context" - "fmt" - "log" - "os" "strings" "sync" - "text/tabwriter" - "time" ) var ( - debug bool - duration int - testProxyURLs string - warmUpDuration int + // debug is true if --debug is specified + debug bool + // duration is the -d/--duration flag + duration int + // testProxyURLs is the -x/--test-proxy flag, a semi-colon separated list + testProxyURLs string + // warmUpDuration is the -w/--warmup flag + warmUpDuration int + // parallelInstances is the -p/--parallel flag parallelInstances int - wg sync.WaitGroup - numProcesses int -) - -// runTest takes care of the semantics of running a single iteration. It returns the number of times the test ran as an int, the exact number -// of seconds the test ran as a float64, and any errors. -func runTest(p PerfTest, index int, c chan runResult, ID string) { - defer wg.Done() - if debug { - log.Printf("number of proxies %d", len(proxyTransportsSuite)) - } - - // If we are using the test proxy need to set up the in-memory recording. - if testProxyURLs != "" { - // First request goes through in Live mode - proxyTransportsSuite[ID].SetMode("live") - err := p.Run(context.Background()) - if err != nil { - c <- runResult{err: err} - } - - // 2nd request goes through in Record mode - proxyTransportsSuite[ID].SetMode("record") - err = proxyTransportsSuite[ID].start() - if err != nil { - panic(err) - } - err = p.Run(context.Background()) - if err != nil { - c <- runResult{err: err} - } - err = proxyTransportsSuite[ID].stop() - if err != nil { - panic(err) - } - - // All ensuing requests go through in Playback mode - proxyTransportsSuite[ID].SetMode("playback") - err = proxyTransportsSuite[ID].start() - if err != nil { - panic(err) - } - } - - if warmUpDuration > 0 { - warmUpStart := time.Now() - warmUpLastPrint := 1.0 - warmUpPerSecondCount := make([]int, 0) - warmupCount := 0 - for time.Since(warmUpStart).Seconds() < float64(warmUpDuration) { - err := p.Run(context.Background()) - if err != nil { - c <- runResult{err: err} - } - warmupCount += 1 - - if time.Since(warmUpStart).Seconds() > warmUpLastPrint { - thisSecondCount := warmupCount - sumInts(warmUpPerSecondCount) - c <- runResult{ - warmup: true, - count: thisSecondCount, - parallelIndex: index, - timeInSeconds: time.Since(warmUpStart).Seconds(), - } - - warmUpPerSecondCount = append(warmUpPerSecondCount, thisSecondCount) - warmUpLastPrint += 1.0 - if int(warmUpLastPrint) == warmUpDuration { - // We can have odd scenarios where we send this - // message, and the final message below right after - warmUpLastPrint += 1.0 - } - } - } - - thisSecondCount := warmupCount - sumInts(warmUpPerSecondCount) - c <- runResult{ - warmup: true, - completed: true, - count: thisSecondCount, - parallelIndex: index, - timeInSeconds: time.Since(warmUpStart).Seconds(), - } - } - - timeStart := time.Now() - totalCount := 0 - lastPrint := 1.0 - perSecondCount := make([]int, 0) - for time.Since(timeStart).Seconds() < float64(duration) { - err := p.Run(context.Background()) - if err != nil { - c <- runResult{err: err} - } - totalCount += 1 - - // Every second (roughly) we send an update through the channel - if time.Since(timeStart).Seconds() > lastPrint { - thisCount := totalCount - sumInts(perSecondCount) - c <- runResult{ - count: thisCount, - parallelIndex: index, - completed: false, - timeInSeconds: time.Since(timeStart).Seconds(), - } - lastPrint += 1.0 - perSecondCount = append(perSecondCount, thisCount) - - if int(lastPrint) == duration { - // if we are w/in one second of the end time, we do not - // want to send any more results, we'll just send a final result - lastPrint += 10.0 - } - } - } - - elapsed := time.Since(timeStart).Seconds() - lastSecondCount := totalCount - sumInts(perSecondCount) - c <- runResult{ - completed: true, - count: lastSecondCount, - parallelIndex: index, - timeInSeconds: elapsed, - } - - if testProxyURLs != "" { - // Stop the proxy now - err := proxyTransportsSuite[ID].stop() - if err != nil { - c <- runResult{err: err} - } - proxyTransportsSuite[ID].SetMode("live") - } -} - -// runCleanup takes care of the semantics for tearing down a single iteration of a performance test. -func runCleanup(p PerfTest) error { - return p.Cleanup(context.Background()) -} - -// runResult is the result sent back through the channel for updates and final results -type runResult struct { - // number of iterations completed since the previous message - count int - - // The time the update comes from - timeInSeconds float64 - - // if there is an error it will be here - err error - - // true when this is the last result from a go routine - completed bool - // Index of the goroutine - parallelIndex int + // wg is used to keep track of the number of goroutines created + wg sync.WaitGroup - // indicates if result is from a warmup start - warmup bool -} + // number of processes to use, the --maxprocs flag + numProcesses int +) -// parse the TestProxy input into a slice of strings +// parseProxyURLs splits the --test-proxy input with the delimiter ';' func parseProxyURLS() []string { - var ret []string if testProxyURLs == "" { - return ret + return nil } testProxyURLs = strings.TrimSuffix(testProxyURLs, ";") - ret = strings.Split(testProxyURLs, ";") - - return ret -} - -// Spins off each Parallel instance as a separate goroutine, reads messages and runs cleanup/setup methods. -func runPerfTest(name string, p NewPerfTest) error { - globalInstance, err := p(context.TODO(), PerfTestOptions{Name: name}) - if err != nil { - return err - } - - var perfTests []PerfTest - var IDs []string - proxyURLS := parseProxyURLS() - - w := tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight) - - messages := make(chan runResult) - for idx := 0; idx < parallelInstances; idx++ { - options := &PerfTestOptions{} - - ID := fmt.Sprintf("%s-%d", name, idx) - IDs = append(IDs, ID) - - if testProxyURLs != "" { - proxyURL := proxyURLS[idx%len(proxyURLS)] - transporter := NewProxyTransport(&TransportOptions{ - TestName: ID, - proxyURL: proxyURL, - }) - options.Transporter = transporter - } else { - options.Transporter = defaultHTTPClient - } - options.parallelIndex = idx - - perfTest, err := globalInstance.NewPerfTest(context.TODO(), options) - if err != nil { - panic(err) - } - perfTests = append(perfTests, perfTest) - } - - for idx, perfTest := range perfTests { - // Create a thread for running a single test - wg.Add(1) - go runTest(perfTest, idx, messages, IDs[idx]) - } - - warmUpStatus := newStatusRunner(time.Now(), warmUpDuration) - durationStatus := newStatusRunner(time.Now().Add(time.Duration(warmUpDuration)*time.Second), duration) - // Read incoming messages and handle status updates - - if warmUpDuration > 0 { - wg.Add(1) - go func() { - warmUpStatus.isWarmup = true - warmUpStatus.printUpdates() - warmUpStatus.printFinalUpdate() - wg.Done() - }() - } - - wg.Add(1) - go func() { - durationStatus.printUpdates() - durationStatus.printFinalUpdate() - wg.Done() - }() - - // Add another goroutine to close the channel after completion - go func() { - wg.Wait() - close(messages) - }() - - for msg := range messages { - if debug { - log.Println("Handling message: ", msg) - } - if msg.err != nil { - panic(msg.err) - } - if msg.warmup { - warmUpStatus.handleMessage(msg, w) - } else { - durationStatus.handleMessage(msg, w) - } - } - - // Run Cleanup on each parallel instance - for _, pTest := range perfTests { - err := runCleanup(pTest) - if err != nil { - panic(err) - } - } - - err = globalInstance.GlobalCleanup(context.TODO()) - if err != nil { - return fmt.Errorf("there was an error with the GlobalCleanup method: %v", err.Error()) - } - - return nil + return strings.Split(testProxyURLs, ";") } diff --git a/sdk/internal/perf/perf.go b/sdk/internal/perf/perf.go index c366faca5a67..50f9ff97b74f 100644 --- a/sdk/internal/perf/perf.go +++ b/sdk/internal/perf/perf.go @@ -10,6 +10,8 @@ import ( "net/http" "os" "runtime" + "sync/atomic" + "time" ) func init() { @@ -64,6 +66,35 @@ type PerfTestOptions struct { // parallelIndex is the index of the goroutine parallelIndex int + + // number of warmup operations completed + warmupCount int64 + warmupStart *time.Time + warmupElapsed time.Duration + + // number of operations runCount + runCount int64 + runStart *time.Time + runElapsed time.Duration + + finished bool +} + +func newPerfTestOptions(name string) PerfTestOptions { + return PerfTestOptions{ + Name: name, + warmupStart: &time.Time{}, + runStart: &time.Time{}, + } +} + +// increment does an atomic increment of the warmup or non-warmup performance test +func (p *PerfTestOptions) increment(warmup bool) { + if warmup { + atomic.AddInt64(&p.warmupCount, 1) + } else { + atomic.AddInt64(&p.runCount, 1) + } } // NewPerfTest returns an instance of PerfTest and embeds the given `options` in the struct @@ -77,7 +108,6 @@ type PerfMethods struct { // Run runs an individual test, registers, and parses command line flags func Run(tests map[string]PerfMethods) { - if len(os.Args) < 2 { // Error out and show available perf tests fmt.Println("Available performance tests:") @@ -119,7 +149,8 @@ func Run(tests map[string]PerfMethods) { fmt.Printf("\tRunning %s\n", testNameToRun) - err := runPerfTest(testNameToRun, perfTestToRun.New) + runner := newPerfRunner(perfTestToRun, testNameToRun) + err := runner.Run() if err != nil { panic(err) } diff --git a/sdk/internal/perf/perf_runner.go b/sdk/internal/perf/perf_runner.go new file mode 100644 index 000000000000..01cef9bc40d4 --- /dev/null +++ b/sdk/internal/perf/perf_runner.go @@ -0,0 +1,457 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package perf + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "sync" + "sync/atomic" + "text/tabwriter" + "time" + + "golang.org/x/text/message" +) + +// optionsSlice is a way to access the options in a thread safe way +type optionsSlice struct { + opts []*PerfTestOptions + mu sync.Mutex +} + +type perfRunner struct { + // ticker is the runner for giving updates every second + ticker *time.Ticker + done chan bool + + // name of the performance test + name string + + // the perf test, options, and transports being tested/used + perfToRun PerfMethods + allOptions optionsSlice + proxyTransports map[string]*RecordingHTTPClient + + // All created tests + tests []PerfTest + + // globalInstance is the single globalInstance for GlobalCleanup + globalInstance GlobalPerfTest + + // this is the previous prints total + warmupOperationStatusTracker int64 + operationStatusTracker int64 + + // writer and messagePrinter + w *tabwriter.Writer + messagePrinter *message.Printer + + // tracker for whether the warmup has finished + warmupFinished int32 + warmupPrinted bool +} + +func newPerfRunner(p PerfMethods, name string) *perfRunner { + warmupFinished, warmupPrinted := 0, false + if warmUpDuration == 0 { + warmupFinished, warmupPrinted = parallelInstances, true + } + return &perfRunner{ + ticker: time.NewTicker(time.Second), + done: make(chan bool), + name: name, + proxyTransports: map[string]*RecordingHTTPClient{}, + perfToRun: p, + operationStatusTracker: -1, + warmupOperationStatusTracker: -1, + w: tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight), + messagePrinter: message.NewPrinter(message.MatchLanguage("en")), + warmupFinished: int32(warmupFinished), + warmupPrinted: warmupPrinted, + } +} + +func (r *perfRunner) Run() error { + err := r.globalSetup() + if err != nil { + return err + } + defer func() { + err = r.globalInstance.GlobalCleanup(context.Background()) + if err != nil { + panic(err) + } + }() + + err = r.createPerfTests() + if err != nil { + return err + } + + r.ticker = time.NewTicker(time.Second) + + // Poller for printing + go func() { + for { + if r.ticker != nil { + select { + case <-r.done: + return + case <-r.ticker.C: + err := r.printStatus() + if err != nil { + panic(err) + } + } + } + } + }() + wg.Wait() + + r.done <- true + + err = r.printFinalUpdate(false) + if err != nil { + return err + } + return r.cleanup() +} + +// global setup by instantiating a single global instance +func (r *perfRunner) globalSetup() error { + globalInst, err := r.perfToRun.New(context.TODO(), newPerfTestOptions(r.name)) + if err != nil { + return err + } + r.globalInstance = globalInst + return nil +} + +// createPerfTests spins up `parallelInstances` (specified by --parallel flag) goroutines +func (r *perfRunner) createPerfTests() error { + var IDs []string + proxyURLS := parseProxyURLS() + + for idx := 0; idx < parallelInstances; idx++ { + ID := fmt.Sprintf("%s-%d", r.name, idx) + options := newPerfTestOptions(ID) + IDs = append(IDs, ID) + + if testProxyURLs != "" { + proxyURL := proxyURLS[idx%len(proxyURLS)] + transporter := NewProxyTransport(&TransportOptions{ + TestName: ID, + proxyURL: proxyURL, + }) + options.Transporter = transporter + r.proxyTransports[ID] = transporter + } else { + options.Transporter = defaultHTTPClient + } + options.parallelIndex = idx + + perfTest, err := r.globalInstance.NewPerfTest(context.TODO(), &options) + if err != nil { + return err + } + r.tests = append(r.tests, perfTest) + r.allOptions.mu.Lock() + r.allOptions.opts = append(r.allOptions.opts, &options) + r.allOptions.mu.Unlock() + } + + for idx, test := range r.tests { + wg.Add(1) + go r.runTest(test, idx, IDs[idx]) + } + return nil +} + +// cleanup runs the Cleanup on each of the r.tests +func (r *perfRunner) cleanup() error { + for _, t := range r.tests { + err := t.Cleanup(context.Background()) + if err != nil { + return err + } + } + return nil +} + +// print an update for the last second +func (r *perfRunner) printStatus() error { + if !r.warmupPrinted { + finishedWarmup := r.printWarmupStatus() + if !finishedWarmup { + return nil + } + } + + if r.operationStatusTracker == -1 { + err := r.printFinalUpdate(true) + if err != nil { + return err + } + r.operationStatusTracker = 0 + fmt.Fprintln(r.w, "\nCurrent\tTotal\tAverage\t") + } + totalOperations := r.totalOperations(false) + + _, err := fmt.Fprintf( + r.w, + "%s\t%s\t%s\t\n", + r.messagePrinter.Sprintf("%d", totalOperations-r.operationStatusTracker), + r.messagePrinter.Sprintf("%d", totalOperations), + r.messagePrinter.Sprintf("%.2f", r.opsPerSecond(false)), + ) + if err != nil { + return err + } + r.operationStatusTracker = totalOperations + return r.w.Flush() +} + +// return true if all warmup information has been printed +func (r *perfRunner) printWarmupStatus() bool { + if r.warmupOperationStatusTracker == -1 { + r.warmupOperationStatusTracker = 0 + fmt.Println("===== WARMUP =====") + fmt.Fprintln(r.w, "\nCurrent\tTotal\tAverage\t") + } + totalOperations := r.totalOperations(true) + + if r.warmupOperationStatusTracker == totalOperations { + return true + } + + _, err := fmt.Fprintf( + r.w, + "%s\t%s\t%s\t\n", + r.messagePrinter.Sprintf("%d", totalOperations-r.warmupOperationStatusTracker), + r.messagePrinter.Sprintf("%d", totalOperations), + r.messagePrinter.Sprintf("%.2f", r.opsPerSecond(true)), + ) + if err != nil { + panic(err) + } + r.warmupOperationStatusTracker = totalOperations + err = r.w.Flush() + if err != nil { + panic(err) + } + return false +} + +// totalOperations iterates over all options structs to get the number of operations completed +func (r *perfRunner) totalOperations(warmup bool) int64 { + var ret int64 + + r.allOptions.mu.Lock() + defer r.allOptions.mu.Unlock() + for _, opt := range r.allOptions.opts { + if warmup { + ret += atomic.LoadInt64(&opt.warmupCount) + } else { + ret += atomic.LoadInt64(&opt.runCount) + } + } + + return ret +} + +// opsPerSecond calculates the average number of operations per second +func (r *perfRunner) opsPerSecond(warmup bool) float64 { + var ret float64 + + r.allOptions.mu.Lock() + defer r.allOptions.mu.Unlock() + for _, opt := range r.allOptions.opts { + if warmup { + e := float64(atomic.LoadInt64((*int64)(&opt.warmupElapsed))) / float64(time.Second) + if e != 0 { + ret += float64(atomic.LoadInt64(&opt.warmupCount)) / e + } + } else { + e := float64(atomic.LoadInt64((*int64)(&opt.runElapsed))) / float64(time.Second) + if e != 0 { + ret += float64(atomic.LoadInt64(&opt.runCount)) / e + } + } + } + return ret +} + +// printFinalUpdate prints the final update for the warmup/test run +func (r *perfRunner) printFinalUpdate(warmup bool) error { + if r.warmupPrinted && warmup { + return nil + } + totalOperations := r.totalOperations(warmup) + opsPerSecond := r.opsPerSecond(warmup) + if opsPerSecond == 0.0 { + return fmt.Errorf("completed without generating operation statistics") + } + + secondsPerOp := 1.0 / opsPerSecond + weightedAvg := float64(totalOperations) / opsPerSecond + + if warmup { + fmt.Println("\n=== Warm Up Results ===") + } else { + fmt.Println("\n=== Results ===") + } + fmt.Printf( + "Completed %s operations in a weighted-average of %ss (%s ops/s, %s s/op)\n", + r.messagePrinter.Sprintf("%d", totalOperations), + r.messagePrinter.Sprintf("%.2f", weightedAvg), + r.messagePrinter.Sprintf("%.3f", opsPerSecond), + r.messagePrinter.Sprintf("%.3f", secondsPerOp), + ) + return nil +} + +// runTest takes care of the semantics of running a single iteration. +// It changes configuration on the proxy, increments counters, and +// updates the running-time. +func (r *perfRunner) runTest(p PerfTest, index int, ID string) { + defer wg.Done() + if debug { + log.Printf("number of proxies %d", len(r.proxyTransports)) + } + + r.allOptions.mu.Lock() + opts := r.allOptions.opts[index] + r.allOptions.mu.Unlock() + + // If we are using the test proxy need to set up the in-memory recording. + if testProxyURLs != "" { + // First request goes through in Live mode + r.proxyTransports[ID].SetMode("live") + err := p.Run(context.Background()) + if err != nil { + if err != nil { + panic(err) + } + } + + // 2nd request goes through in Record mode + r.proxyTransports[ID].SetMode("record") + err = r.proxyTransports[ID].start() + if err != nil { + panic(err) + + } + + err = p.Run(context.Background()) + if err != nil { + if err != nil { + panic(err) + } + } + err = r.proxyTransports[ID].stop() + if err != nil { + panic(err) + } + + // All ensuing requests go through in Playback mode + r.proxyTransports[ID].SetMode("playback") + err = r.proxyTransports[ID].start() + if err != nil { + panic(err) + } + } + + // true parameter indicates were running the warmup here + err := r.runTestForDuration(p, opts, true) + if err != nil { + panic(err) + } + + // increment the warmupFinished counter that one goroutine has finished warmup + val := atomic.AddInt32(&r.warmupFinished, 1) + if debug { + fmt.Printf("finished %d warmups\n", val) + } + + // run the actual test + err = r.runTestForDuration(p, opts, false) + if err != nil { + panic(err) + } + + if testProxyURLs != "" { + // Stop the proxy now + err := proxyTransportsSuite[ID].stop() + if err != nil { + panic(err) + } + proxyTransportsSuite[ID].SetMode("live") + } + opts.finished = true +} + +func (r *perfRunner) runTestForDuration(p PerfTest, opts *PerfTestOptions, warmup bool) error { + if warmup && warmUpDuration <= 0 { + return nil + } + + // startPtr is our base time for keeping track of how long a test has run + var startPtr *time.Time + if warmup { + t := time.Now() + opts.warmupStart = &t + startPtr = opts.warmupStart + } else { + t := time.Now() + opts.runStart = &t + startPtr = opts.runStart + } + + var runDuration int + if warmup { + runDuration = warmUpDuration + } else { + runDuration = duration + } + + var ctx context.Context + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), time.Second*time.Duration(runDuration)) + defer cancel() + + lastSavedTime := time.Now() + for time.Since(*startPtr).Seconds() < float64(runDuration) { + err := p.Run(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + break + } else { + return err + } + } + opts.increment(warmup) + + if time.Since(lastSavedTime).Seconds() > 0.3 { + duration := time.Since(*startPtr) + if warmup { + atomic.StoreInt64((*int64)(&opts.warmupElapsed), int64(duration)) + } else { + atomic.StoreInt64((*int64)(&opts.runElapsed), int64(duration)) + } + lastSavedTime = time.Now() + } + } + + duration := time.Since(*startPtr) + if warmup { + atomic.StoreInt64((*int64)(&opts.warmupElapsed), int64(duration)) + } else { + atomic.StoreInt64((*int64)(&opts.runElapsed), int64(duration)) + } + + return nil +} diff --git a/sdk/internal/perf/perf_test.go b/sdk/internal/perf/perf_test.go index c52d77fb4ccd..f10239e30c5d 100644 --- a/sdk/internal/perf/perf_test.go +++ b/sdk/internal/perf/perf_test.go @@ -58,20 +58,15 @@ func TestRun(t *testing.T) { warmUpDuration = 0 parallelInstances = 1 - err := runPerfTest("Sleep", NewNoOpTest) + runner := newPerfRunner(PerfMethods{New: NewNoOpTest, Register: nil}, "NoOpTest") + err := runner.Run() require.NoError(t, err) - - require.True(t, globalCleanup) - require.True(t, newnooptest) - require.True(t, newperftest) - require.True(t, run) - require.True(t, cleanup) } func TestParseProxyURLs(t *testing.T) { testProxyURLs = "" result := parseProxyURLS() - require.Nil(t, result) + require.Equal(t, 0, len(result)) testProxyURLs = "https://localhost:5001" result = parseProxyURLS() diff --git a/sdk/internal/perf/random_stream.go b/sdk/internal/perf/random_stream.go index 571ee8dce068..f1385ec3da34 100644 --- a/sdk/internal/perf/random_stream.go +++ b/sdk/internal/perf/random_stream.go @@ -22,6 +22,13 @@ type randomStream struct { remaining int } +func min(a, b int) int { + if a < b { + return a + } + return b +} + func (r *randomStream) Read(p []byte) (int, error) { if r.remaining == 0 { return 0, io.EOF diff --git a/sdk/internal/perf/status.go b/sdk/internal/perf/status.go deleted file mode 100644 index a160b0a66574..000000000000 --- a/sdk/internal/perf/status.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package perf - -import ( - "fmt" - "os" - "text/tabwriter" - "time" - - "golang.org/x/text/message" -) - -var messagePrinter *message.Printer = message.NewPrinter(message.MatchLanguage("en")) - -// statusRunner is the struct responsible for handling messages -type statusRunner struct { - // results is a slice of all results from the goroutines - results []runResult - - // start is the time the statusRunner was started - start time.Time - - // perRoutineResults map the parallel index to a slice of runResults - perRoutineResults map[int][]runResult - - // lastPrint holds when the last information was printed to stdout - // the initial value is the same as start. When this value exceeds - // is more than 1 second after time.Now(), a new update is printed. - lastPrint time.Time - - // total is a running count of the count of performance tests run - total int - - // prevTotal is the total of the last output - prevTotal int - - // hasFinished indicates if the final results have been printed out - totalRunTime int - - // routinesFinished indicates how many routines have sent a message - // indicating they have completed execution - routinesFinished int - - // isWarmup indicates whether the messages are from warmup - isWarmup bool -} - -func newStatusRunner(t time.Time, runTime int) *statusRunner { - return &statusRunner{ - results: make([]runResult, 0), - start: t, - perRoutineResults: map[int][]runResult{}, - lastPrint: t, - totalRunTime: runTime, - } -} - -func (s *statusRunner) handleMessage(msg runResult, w *tabwriter.Writer) { - s.results = append(s.results, msg) - - if msg.completed { - s.routinesFinished += 1 - } - - s.total += msg.count - - s.perRoutineResults[msg.parallelIndex] = append(s.perRoutineResults[msg.parallelIndex], msg) -} - -func (s *statusRunner) printUpdates() { - w := tabwriter.NewWriter(os.Stdout, 16, 8, 1, ' ', tabwriter.AlignRight) - firstPrint := false - for s.routinesFinished != parallelInstances { - // Poll and print - if time.Since(s.lastPrint).Seconds() > 1.0 { - - if !firstPrint { - if s.isWarmup { - fmt.Println("\n=== Warm Up ===") - } else { - fmt.Println("\n=== Test ===") - } - fmt.Fprintln(w, "Current\tTotal\tAverage\t") - w.Flush() - firstPrint = true - } - - avg := float64(s.total) / time.Since(s.start).Seconds() - _, err := fmt.Fprintf( - w, - "%s\t%s\t%s\t\n", - messagePrinter.Sprintf("%d", s.total-s.prevTotal), - messagePrinter.Sprintf("%d", s.total), - messagePrinter.Sprintf("%.2f", avg), - ) - if err != nil { - panic(err) - } - - w.Flush() - - s.lastPrint = time.Now() - s.prevTotal = s.total - } - } -} - -func (s *statusRunner) printFinalUpdate() { - opsPerRoutine := make([]int, parallelInstances) - secondsPerRoutine := make([]float64, parallelInstances) - - for pIdx, msgs := range s.perRoutineResults { - secondsPerRoutine[pIdx] = msgs[len(msgs)-1].timeInSeconds - for _, msg := range msgs { - opsPerRoutine[pIdx] += msg.count - } - } - - opsPerSecond := 0.0 - for i := 0; i < parallelInstances; i++ { - opsPerSecond += float64(opsPerRoutine[i]) / secondsPerRoutine[i] - } - - fmt.Println("\n=== Results ===") - secondsPerOp := 1.0 / opsPerSecond - weightedAvgSec := float64(s.total) / opsPerSecond - fmt.Printf( - "Completed %s operations in a weighted-average of %ss (%s ops/s, %s s/op)\n", - messagePrinter.Sprintf("%d", s.total), - messagePrinter.Sprintf("%.2f", weightedAvgSec), - messagePrinter.Sprintf("%.2f", opsPerSecond), - messagePrinter.Sprintf("%.3f", secondsPerOp), - ) -} diff --git a/sdk/internal/perf/testdata/perf/sleep_perf.go b/sdk/internal/perf/testdata/perf/sleep_perf.go index 271794ae5030..7056d0083fbd 100644 --- a/sdk/internal/perf/testdata/perf/sleep_perf.go +++ b/sdk/internal/perf/testdata/perf/sleep_perf.go @@ -58,8 +58,13 @@ func (g *globalSleepPerfTest) NewPerfTest(ctx context.Context, options *perf.Per } func (s *sleepPerfTest) Run(ctx context.Context) error { - time.Sleep(s.sleepInterval) - s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds())*sleepTestOpts.iterationGrowthFactor) * time.Nanosecond + select { + case <-ctx.Done(): + return nil + default: + time.Sleep(s.sleepInterval) + s.sleepInterval = time.Duration(float64(s.sleepInterval.Nanoseconds()) * sleepTestOpts.iterationGrowthFactor) + } return nil } diff --git a/sdk/internal/perf/utils.go b/sdk/internal/perf/utils.go deleted file mode 100644 index 9905cd4b1037..000000000000 --- a/sdk/internal/perf/utils.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package perf - -// Helper function to sum a slice of integers -func sumInts(ints []int) int { - var ret int - for _, i := range ints { - ret += i - } - return ret -} - -func min(a, b int) int { - if a < b { - return a - } - return b -}