Skip to content

Commit

Permalink
Fix metrics concurrency (#1312)
Browse files Browse the repository at this point in the history
* Fix metrics concurrency

Signed-off-by: Artem Glazychev <[email protected]>

* Add unit test

Signed-off-by: Artem Glazychev <[email protected]>

* fix type alias

Signed-off-by: Artem Glazychev <[email protected]>
  • Loading branch information
glazychev-art authored Jun 16, 2022
1 parent 82a3e5a commit 1bc5125
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 7 deletions.
33 changes: 33 additions & 0 deletions pkg/networkservice/common/metrics/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"

"go.opentelemetry.io/otel/metric"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type keyType struct{}
type metricsMap = map[string]metric.Int64Histogram

func loadOrStore(ctx context.Context, metrics metricsMap) (value metricsMap, ok bool) {
rawValue, ok := metadata.Map(ctx, false).LoadOrStore(keyType{}, metrics)
return rawValue.(metricsMap), ok
}
13 changes: 6 additions & 7 deletions pkg/networkservice/common/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ import (
)

type metricServer struct {
recorderMap map[string]metric.Int64Histogram
meter metric.Meter
meter metric.Meter
}

// NewServer returns a new metric server chain element
func NewServer() networkservice.NetworkServiceServer {
return &metricServer{
recorderMap: make(map[string]metric.Int64Histogram),
meter: global.Meter(""),
meter: global.Meter(""),
}
}

Expand Down Expand Up @@ -70,19 +68,20 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa
continue
}

metrics, _ := loadOrStore(ctx, make(map[string]metric.Int64Histogram))
for metricName, metricValue := range pathSegment.Metrics {
/* Works with integers only */
recVal, err := strconv.ParseInt(metricValue, 10, 64)
if err != nil {
continue
}
_, ok := t.recorderMap[metricName]
_, ok := metrics[metricName]
if !ok {
t.recorderMap[metricName] = metric.Must(t.meter).NewInt64Histogram(
metrics[metricName] = metric.Must(t.meter).NewInt64Histogram(
pathSegment.Name + "_" + metricName,
)
}
t.recorderMap[metricName].Record(ctx, recVal, attribute.String("connection", path.GetPathSegments()[0].Id))
metrics[metricName].Record(ctx, recVal, attribute.String("connection", path.GetPathSegments()[0].Id))
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/networkservice/common/metrics/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics_test

import (
"context"

"math/rand"
"strconv"
"testing"

"go.uber.org/goleak"

"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/require"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/metrics"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

func TestMetrics_Concurrency(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

server := chain.NewNetworkServiceServer(
begin.NewServer(),
metadata.NewServer(),
updatepath.NewServer("testServer"),
&metricsGeneratorServer{},
metrics.NewServer(),
)
for i := 0; i < 100; i++ {
go func(i int) {
req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{Id: "nsc-" + strconv.Itoa(i)},
}
_, err := server.Request(context.Background(), req)
require.NoError(t, err)
}(i)
}
}

type metricsGeneratorServer struct{}

func (s *metricsGeneratorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
segment := request.GetConnection().GetPath().GetPathSegments()[0]
if segment.Metrics == nil {
segment.Metrics = make(map[string]string)
}
// Generate any random metric value
// nolint:gosec
segment.Metrics["testMetric"] = strconv.Itoa(rand.Intn(100))
return next.Server(ctx).Request(ctx, request)
}

func (s *metricsGeneratorServer) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
return next.Server(ctx).Close(ctx, connection)
}

0 comments on commit 1bc5125

Please sign in to comment.