From d2761c8dd195f09fda87fc60511bfe8e265c7dd2 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Thu, 8 Sep 2022 15:00:33 +0100 Subject: [PATCH] Change advanced metrics socket default location (#32) * Move default location of advanced metrics socket * Fix typos. Lint * Update docs * Restrict socket permissions --- src/core/config/config.go | 1 - src/core/config/defaults.go | 2 +- src/extensions/advanced-metrics/README.md | 8 ++++---- src/extensions/advanced-metrics/reader/reader.go | 12 +++++------- .../tables/priority_table/priority_table.go | 2 +- .../advanced-metrics/tables/schema/field.go | 10 +++++----- .../advanced-metrics/tables/schema/schema.go | 2 +- src/plugins/advanced_metrics.go | 10 +++++----- src/plugins/advanced_metrics_test.go | 4 ++-- .../nginx/agent/v2/src/core/config/config.go | 1 - .../nginx/agent/v2/src/core/config/defaults.go | 2 +- .../src/extensions/advanced-metrics/reader/reader.go | 12 +++++------- .../tables/priority_table/priority_table.go | 2 +- .../advanced-metrics/tables/schema/field.go | 10 +++++----- .../advanced-metrics/tables/schema/schema.go | 2 +- .../nginx/agent/v2/src/plugins/advanced_metrics.go | 10 +++++----- 16 files changed, 42 insertions(+), 48 deletions(-) diff --git a/src/core/config/config.go b/src/core/config/config.go index 83d92071f..cdc988afc 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -345,7 +345,6 @@ func LoadPropertiesFromFile(cfg string) error { // Get dynamic file, if it doesn't exist create it. file, err := os.Stat(dynamicCfgPath) - if err != nil { log.Warnf("Unable to read dynamic config (%s), got the following error: %v", dynamicCfgPath, err) } diff --git a/src/core/config/defaults.go b/src/core/config/defaults.go index d1a10b2b8..2ecf4a2b5 100644 --- a/src/core/config/defaults.go +++ b/src/core/config/defaults.go @@ -67,7 +67,7 @@ var ( Mode: "aggregation", }, AdvancedMetrics: AdvancedMetrics{ - SocketPath: "/tmp/advanced-metrics.sock", + SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", AggregationPeriod: time.Second * 10, PublishingPeriod: time.Second * 30, TableSizesLimits: advanced_metrics.TableSizesLimits{ diff --git a/src/extensions/advanced-metrics/README.md b/src/extensions/advanced-metrics/README.md index e1c132eb7..9f674fca0 100644 --- a/src/extensions/advanced-metrics/README.md +++ b/src/extensions/advanced-metrics/README.md @@ -20,7 +20,7 @@ need to be provided. nginx-agent.conf snippet for advanced metrics showing defau server: ... tls: ... advanced_metrics: - socket_path: /tmp/acm.sock + socket_path: /var/run/nginx-agent/advanced-metrics.sock aggregation_period: 1s publishing_period: 3s table_sizes_limits: @@ -32,7 +32,7 @@ advanced_metrics: #### Parameter Definitions:
|Parameter| Description| | ----------- | ----------- | -socket_path| Full os filepath to the unix socket which Nginx+ andAgent use to comunicate. +socket_path| Full os filepath to the unix socket which Nginx+ and Agent use to communicate. aggregation_period| Frequency at which data in priority tables are aggregated to conserve space prior to publishing. publishing_period| Frequency at which data in priority tables is published to Management Plane. table_sizes_limits|staging_table_max_size| Max number of records allowed within any single aggregation period.staging_table_threshold | When the number of records reaches this threshold, data aggregation starts to keep number of records within the staging_table_max_size limit. **staging_table_threshold ≤ staging_table_max_size**. @@ -124,9 +124,9 @@ schema.NewSchemaBuilder(). This example defines that advanced metrics is able to receive messages with 5 fields and only 5 fields where: - 1st is dimension with `dim1` name, this name will be used by `Publisher` to set `MetricsSet.Dimensions.Name` value, and cardinality 100, which means that up to 100 different possible dimensions values will be collected in single `PublishingPeriod`, -- 2nd same as above but this `dim2` dimension additionally specifies CollapsingLevel, which should be a percent more [here](./pkg/shema_builder.go) +- 2nd same as above but this `dim2` dimension additionally specifies CollapsingLevel, which should be a percent more [here](./pkg/schema/schema_builder.go) - 3th dimension which is integer dimensions, so value of dimensions will be converted into integer and IT'S value will be used as a lookup code, this is optimization which save space in lookup tables and stores its value in key itself rather than keeping string representation of integers -- 4th and 5th are metrics same as with dimenisons this name will be used in `MetricsSet` struct, metrics does not contain any additional options +- 4th and 5th are metrics same as with dimensions this name will be used in `MetricsSet` struct, metrics does not contain any additional options ### Advanced Metrics diff --git a/src/extensions/advanced-metrics/reader/reader.go b/src/extensions/advanced-metrics/reader/reader.go index c7cd8fbe3..ddd8ea40e 100644 --- a/src/extensions/advanced-metrics/reader/reader.go +++ b/src/extensions/advanced-metrics/reader/reader.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" - "golang.org/x/sync/errgroup" "net" "os" "sync" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) //go:generate mockgen -source reader.go -destination mocks/reader_mock.go -package mocks @@ -67,12 +67,9 @@ func NewReader(address string) *Reader { func newReader(address string, listenerConfig ListenerConfig, frameChannel chan Frame, newWorker NewWorkerConstructor) *Reader { return &Reader{ listenerConfig: listenerConfig, - - address: address, - - newWorker: newWorker, - - frameChannel: frameChannel, + address: address, + newWorker: newWorker, + frameChannel: frameChannel, } } @@ -151,6 +148,7 @@ func (r *Reader) runWorker(ctx context.Context, connection net.Conn, id int) { func (r *Reader) checkSocketAndCleanup() error { log.Info("Checking availability of unix socket") + if _, err := os.Stat(r.address); err == nil { err = os.Remove(r.address) if err != nil { diff --git a/src/extensions/advanced-metrics/tables/priority_table/priority_table.go b/src/extensions/advanced-metrics/tables/priority_table/priority_table.go index 0d1f8d712..cdb309fb8 100644 --- a/src/extensions/advanced-metrics/tables/priority_table/priority_table.go +++ b/src/extensions/advanced-metrics/tables/priority_table/priority_table.go @@ -97,7 +97,7 @@ func (p *PriorityTable) shouldCollapseSamples() bool { func (p *PriorityTable) collapseSample(sample *sample.Sample, currentCollapseLevel limits.CollapsingLevel) { for _, dim := range p.schema.Dimensions() { if dim.ShouldCollapse(currentCollapseLevel) { - sample.Key().SetKeyPart(lookup.LookupAggrCode, dim.KeyBitSize, dim.KeyBitPositionInCompoudKey) + sample.Key().SetKeyPart(lookup.LookupAggrCode, dim.KeyBitSize, dim.KeyBitPositionInCompoundKey) } } } diff --git a/src/extensions/advanced-metrics/tables/schema/field.go b/src/extensions/advanced-metrics/tables/schema/field.go index 50e86e29b..f15abf5f1 100644 --- a/src/extensions/advanced-metrics/tables/schema/field.go +++ b/src/extensions/advanced-metrics/tables/schema/field.go @@ -30,11 +30,11 @@ type Field struct { // MaxDimensionSetSize specifies max unique dimension which will be stored in staging table // if max size will be reaches all new unique dimensions will be transformed to AGGR value type DimensionField struct { - KeyBitSize int - KeyBitPositionInCompoudKey int - MaxDimensionSetSize uint32 - Transform *DimensionTransformFunction - CollapsingLevel *limits.CollapsingLevel + KeyBitSize int + KeyBitPositionInCompoundKey int + MaxDimensionSetSize uint32 + Transform *DimensionTransformFunction + CollapsingLevel *limits.CollapsingLevel } type FieldOption func(f *Field) diff --git a/src/extensions/advanced-metrics/tables/schema/schema.go b/src/extensions/advanced-metrics/tables/schema/schema.go index 2c9fb592c..b8f403c83 100644 --- a/src/extensions/advanced-metrics/tables/schema/schema.go +++ b/src/extensions/advanced-metrics/tables/schema/schema.go @@ -24,7 +24,7 @@ func NewSchema(fields ...*Field) *Schema { if f.Type == FieldTypeDimension { dims = append(dims, f) - f.KeyBitPositionInCompoudKey = keyBits + f.KeyBitPositionInCompoundKey = keyBits keyBits += f.KeyBitSize f.index = dimensionIndex dimensionIndex++ diff --git a/src/plugins/advanced_metrics.go b/src/plugins/advanced_metrics.go index 3b978b03c..e6076965f 100644 --- a/src/plugins/advanced_metrics.go +++ b/src/plugins/advanced_metrics.go @@ -202,7 +202,7 @@ func (m *AdvancedMetrics) run() { if err != nil { log.Error("App centric metric plugin failed to change socket permissions") } - commonDimmensions := append(m.commonDims.ToDimensions(), &proto.Dimension{ + commonDimensions := append(m.commonDims.ToDimensions(), &proto.Dimension{ Name: aggregationDurationDimension, Value: strconv.Itoa(int(m.cfg.PublishingPeriod.Seconds())), }) @@ -214,7 +214,7 @@ func (m *AdvancedMetrics) run() { return } now := types.TimestampNow() - m.pipeline.Process(core.NewMessage(core.CommMetrics, []core.Payload{toMetricReport(mr, now, commonDimmensions)})) + m.pipeline.Process(core.NewMessage(core.CommMetrics, []core.Payload{toMetricReport(mr, now, commonDimensions)})) case <-m.pipeline.Context().Done(): return } @@ -229,7 +229,7 @@ func enableWritePermissionForSocket(path string) error { case <-timeout: return lastError default: - lastError = os.Chmod(path, 0774) + lastError = os.Chmod(path, 0660) if lastError == nil { return nil } @@ -238,7 +238,7 @@ func enableWritePermissionForSocket(path string) error { } } -func toMetricReport(set []*publisher.MetricSet, now *types.Timestamp, commonDimmensions []*proto.Dimension) *proto.MetricsReport { +func toMetricReport(set []*publisher.MetricSet, now *types.Timestamp, commonDimensions []*proto.Dimension) *proto.MetricsReport { mr := &proto.MetricsReport{ Meta: &proto.Metadata{Timestamp: now}, Type: proto.MetricsReport_INSTANCE, @@ -249,7 +249,7 @@ func toMetricReport(set []*publisher.MetricSet, now *types.Timestamp, commonDimm statsEntity := proto.StatsEntity{ Timestamp: now, Simplemetrics: make([]*proto.SimpleMetric, 0, len(s.Metrics)*4), - Dimensions: commonDimmensions, + Dimensions: commonDimensions, } isStreamMetric := false diff --git a/src/plugins/advanced_metrics_test.go b/src/plugins/advanced_metrics_test.go index 89c4e27c3..ff9216b4b 100644 --- a/src/plugins/advanced_metrics_test.go +++ b/src/plugins/advanced_metrics_test.go @@ -186,7 +186,7 @@ func TestAppCentricMetric_toMetricReport(t *testing.T) { }, report) } -func TestAppCenticMetricClose(t *testing.T) { +func TestAppCentricMetricClose(t *testing.T) { env := tutils.GetMockEnv() pluginUnderTest := NewAdvancedMetrics(env, &config.Config{}) @@ -201,7 +201,7 @@ func TestAppCenticMetricClose(t *testing.T) { env.AssertExpectations(t) } -func TestAppCenticMetricSubscriptions(t *testing.T) { +func TestAppCentricMetricSubscriptions(t *testing.T) { pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{}) assert.Equal(t, []string{}, pluginUnderTest.Subscriptions()) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index 83d92071f..cdc988afc 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -345,7 +345,6 @@ func LoadPropertiesFromFile(cfg string) error { // Get dynamic file, if it doesn't exist create it. file, err := os.Stat(dynamicCfgPath) - if err != nil { log.Warnf("Unable to read dynamic config (%s), got the following error: %v", dynamicCfgPath, err) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index d1a10b2b8..2ecf4a2b5 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -67,7 +67,7 @@ var ( Mode: "aggregation", }, AdvancedMetrics: AdvancedMetrics{ - SocketPath: "/tmp/advanced-metrics.sock", + SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", AggregationPeriod: time.Second * 10, PublishingPeriod: time.Second * 30, TableSizesLimits: advanced_metrics.TableSizesLimits{ diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/reader/reader.go b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/reader/reader.go index c7cd8fbe3..ddd8ea40e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/reader/reader.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/reader/reader.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" - "golang.org/x/sync/errgroup" "net" "os" "sync" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) //go:generate mockgen -source reader.go -destination mocks/reader_mock.go -package mocks @@ -67,12 +67,9 @@ func NewReader(address string) *Reader { func newReader(address string, listenerConfig ListenerConfig, frameChannel chan Frame, newWorker NewWorkerConstructor) *Reader { return &Reader{ listenerConfig: listenerConfig, - - address: address, - - newWorker: newWorker, - - frameChannel: frameChannel, + address: address, + newWorker: newWorker, + frameChannel: frameChannel, } } @@ -151,6 +148,7 @@ func (r *Reader) runWorker(ctx context.Context, connection net.Conn, id int) { func (r *Reader) checkSocketAndCleanup() error { log.Info("Checking availability of unix socket") + if _, err := os.Stat(r.address); err == nil { err = os.Remove(r.address) if err != nil { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/priority_table/priority_table.go b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/priority_table/priority_table.go index 0d1f8d712..cdb309fb8 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/priority_table/priority_table.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/priority_table/priority_table.go @@ -97,7 +97,7 @@ func (p *PriorityTable) shouldCollapseSamples() bool { func (p *PriorityTable) collapseSample(sample *sample.Sample, currentCollapseLevel limits.CollapsingLevel) { for _, dim := range p.schema.Dimensions() { if dim.ShouldCollapse(currentCollapseLevel) { - sample.Key().SetKeyPart(lookup.LookupAggrCode, dim.KeyBitSize, dim.KeyBitPositionInCompoudKey) + sample.Key().SetKeyPart(lookup.LookupAggrCode, dim.KeyBitSize, dim.KeyBitPositionInCompoundKey) } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/field.go b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/field.go index 50e86e29b..f15abf5f1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/field.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/field.go @@ -30,11 +30,11 @@ type Field struct { // MaxDimensionSetSize specifies max unique dimension which will be stored in staging table // if max size will be reaches all new unique dimensions will be transformed to AGGR value type DimensionField struct { - KeyBitSize int - KeyBitPositionInCompoudKey int - MaxDimensionSetSize uint32 - Transform *DimensionTransformFunction - CollapsingLevel *limits.CollapsingLevel + KeyBitSize int + KeyBitPositionInCompoundKey int + MaxDimensionSetSize uint32 + Transform *DimensionTransformFunction + CollapsingLevel *limits.CollapsingLevel } type FieldOption func(f *Field) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/schema.go b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/schema.go index 2c9fb592c..b8f403c83 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/schema.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/extensions/advanced-metrics/tables/schema/schema.go @@ -24,7 +24,7 @@ func NewSchema(fields ...*Field) *Schema { if f.Type == FieldTypeDimension { dims = append(dims, f) - f.KeyBitPositionInCompoudKey = keyBits + f.KeyBitPositionInCompoundKey = keyBits keyBits += f.KeyBitSize f.index = dimensionIndex dimensionIndex++ diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go index 3b978b03c..e6076965f 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go @@ -202,7 +202,7 @@ func (m *AdvancedMetrics) run() { if err != nil { log.Error("App centric metric plugin failed to change socket permissions") } - commonDimmensions := append(m.commonDims.ToDimensions(), &proto.Dimension{ + commonDimensions := append(m.commonDims.ToDimensions(), &proto.Dimension{ Name: aggregationDurationDimension, Value: strconv.Itoa(int(m.cfg.PublishingPeriod.Seconds())), }) @@ -214,7 +214,7 @@ func (m *AdvancedMetrics) run() { return } now := types.TimestampNow() - m.pipeline.Process(core.NewMessage(core.CommMetrics, []core.Payload{toMetricReport(mr, now, commonDimmensions)})) + m.pipeline.Process(core.NewMessage(core.CommMetrics, []core.Payload{toMetricReport(mr, now, commonDimensions)})) case <-m.pipeline.Context().Done(): return } @@ -229,7 +229,7 @@ func enableWritePermissionForSocket(path string) error { case <-timeout: return lastError default: - lastError = os.Chmod(path, 0774) + lastError = os.Chmod(path, 0660) if lastError == nil { return nil } @@ -238,7 +238,7 @@ func enableWritePermissionForSocket(path string) error { } } -func toMetricReport(set []*publisher.MetricSet, now *types.Timestamp, commonDimmensions []*proto.Dimension) *proto.MetricsReport { +func toMetricReport(set []*publisher.MetricSet, now *types.Timestamp, commonDimensions []*proto.Dimension) *proto.MetricsReport { mr := &proto.MetricsReport{ Meta: &proto.Metadata{Timestamp: now}, Type: proto.MetricsReport_INSTANCE, @@ -249,7 +249,7 @@ func toMetricReport(set []*publisher.MetricSet, now *types.Timestamp, commonDimm statsEntity := proto.StatsEntity{ Timestamp: now, Simplemetrics: make([]*proto.SimpleMetric, 0, len(s.Metrics)*4), - Dimensions: commonDimmensions, + Dimensions: commonDimensions, } isStreamMetric := false