diff --git a/Makefile b/Makefile index 301a2f7296..fa86aa7d9a 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ BUILD_INFO_IMPORT_PATH=go.opentelemetry.io/collector/internal/version all: gotidy test build test: - for dir in $(GODIRS); do (cd $${dir}; $(GOCMD) test --tags=assert ./...) || exit 1; done + for dir in $(GODIRS); do (cd $${dir}; $(GOCMD) test -parallel=4 -race --tags=assert ./...) || exit 1; done fmt: for dir in $(GODIRS); do (cd $${dir}; $(GOCMD) fmt ./...) || exit 1; done diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 5a018596d5..b897281b67 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -106,7 +106,9 @@ type shard struct { pending []pendingItem - totalSent int + // totalSent is a monotonic count of items used to match + // request and response. + totalSent int64 tracer trace.TracerProvider } @@ -323,28 +325,29 @@ func (b *shard) resetTimer() { } func (b *shard) sendItems(trigger trigger) { - sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + split, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) bytes := int64(b.batch.sizeBytes(req)) + toSend := int64(split) var waiters []chan error var countItems []int var contexts []context.Context numItemsBefore := b.totalSent - numItemsAfter := b.totalSent + sent + numItemsAfter := b.totalSent + toSend // The current batch can contain items from several different producers. Ensure each producer gets a response back. for len(b.pending) > 0 && numItemsBefore < numItemsAfter { // Waiter only had some items in the current batch - if numItemsBefore+b.pending[0].numItems > numItemsAfter { - partialSent := numItemsAfter - numItemsBefore + if numItemsBefore+int64(b.pending[0].numItems) > numItemsAfter { + partialSent := int(numItemsAfter - numItemsBefore) b.pending[0].numItems -= partialSent - numItemsBefore += partialSent + numItemsBefore += int64(partialSent) waiters = append(waiters, b.pending[0].respCh) contexts = append(contexts, b.pending[0].parentCtx) countItems = append(countItems, partialSent) } else { // waiter gets a complete response. - numItemsBefore += b.pending[0].numItems + numItemsBefore += int64(b.pending[0].numItems) waiters = append(waiters, b.pending[0].respCh) contexts = append(contexts, b.pending[0].parentCtx) countItems = append(countItems, b.pending[0].numItems) @@ -389,7 +392,7 @@ func (b *shard) sendItems(trigger trigger) { if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) } else { - b.processor.telemetry.record(latency, trigger, int64(sent), bytes) + b.processor.telemetry.record(latency, trigger, toSend, bytes) } }() @@ -457,6 +460,10 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { item.count = telem.LogRecordCount() } + if item.count == 0 { + return nil + } + bytes := int64(b.batch.sizeBytes(data)) if bytes > b.processor.limitBytes { diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 8abd3042cb..059eb28c81 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -134,9 +134,9 @@ func TestBatchProcessorSpansPanicRecover(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { - err = bp.ConsumeTraces(context.Background(), td) + defer wg.Done() + err := bp.ConsumeTraces(context.Background(), td) assert.Contains(t, err.Error(), "testing panic") - wg.Done() }() } @@ -168,9 +168,9 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) { md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) wg.Add(1) go func() { - err = bp.ConsumeMetrics(context.Background(), md) + defer wg.Done() + err := bp.ConsumeMetrics(context.Background(), md) assert.Contains(t, err.Error(), "testing panic") - wg.Done() }() } @@ -202,9 +202,9 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) wg.Add(1) go func() { - err = bp.ConsumeLogs(context.Background(), ld) + defer wg.Done() + err := bp.ConsumeLogs(context.Background(), ld) assert.Contains(t, err.Error(), "testing panic") - wg.Done() }() } @@ -307,9 +307,9 @@ func TestBatchProcessorCancelContext(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { - err = bp.ConsumeTraces(ctx, td) + defer wg.Done() + err := bp.ConsumeTraces(ctx, td) assert.Contains(t, err.Error(), "context canceled") - wg.Done() }() } @@ -389,8 +389,8 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, bp.ConsumeTraces(bg, td)) - wg.Done() }() } wg.Wait() @@ -481,8 +481,8 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, bp.ConsumeTraces(callCtxs[num], td)) - wg.Done() }() } wg.Wait() @@ -528,8 +528,8 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() } @@ -537,8 +537,8 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { td := ptrace.NewTraces() wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() wg.Wait() @@ -580,8 +580,8 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { } wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() } @@ -589,8 +589,8 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { td := ptrace.NewTraces() wg.Add(1) go func() { + defer wg.Done() require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() // shutdown will flush any remaining spans @@ -633,8 +633,8 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry) { sizeSum += sizer.TracesSize(td) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() } @@ -694,8 +694,8 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) { // this should be a noerr but need to separate triggerTimeout from triggerShutdown wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() } @@ -738,8 +738,8 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { td := testdata.GenerateTraces(spansPerRequest) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() } @@ -784,8 +784,8 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { td := testdata.GenerateTraces(spansPerRequest) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - wg.Done() }() } @@ -827,8 +827,8 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - wg.Done() }() } @@ -836,8 +836,8 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { md := pmetric.NewMetrics() wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - wg.Done() }() wg.Wait() @@ -890,8 +890,8 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { size += sizer.MetricsSize(md) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - wg.Done() }() } wg.Wait() @@ -960,8 +960,8 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { md := testdata.GenerateMetrics(metricsPerRequest) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - wg.Done() }() } @@ -1005,8 +1005,8 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { md := testdata.GenerateMetrics(metricsPerRequest) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - wg.Done() }() } @@ -1167,8 +1167,8 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - wg.Done() }() } @@ -1176,8 +1176,8 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { ld := plog.NewLogs() wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - wg.Done() }() wg.Wait() @@ -1228,8 +1228,8 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { size += sizer.LogsSize(ld) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - wg.Done() }() } wg.Wait() @@ -1278,8 +1278,8 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { ld := testdata.GenerateLogs(logsPerRequest) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - wg.Done() }() } @@ -1323,8 +1323,8 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { ld := testdata.GenerateLogs(logsPerRequest) wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - wg.Done() }() } @@ -1380,8 +1380,8 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac for i := 0; i < generatedCount; i++ { wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, proc.ConsumeTraces(context.Background(), testdata.GenerateTraces(1))) - wg.Done() }() } @@ -1482,8 +1482,8 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { expectByContext[num] += spansPerRequest wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(callCtxs[num], td)) - wg.Done() }() } @@ -1546,8 +1546,8 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { wg.Add(1) go func() { + defer wg.Done() assert.NoError(t, batcher.ConsumeTraces(ctx, td)) - wg.Done() }() } @@ -1561,9 +1561,9 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { wg.Add(1) go func() { + defer wg.Done() err := batcher.ConsumeTraces(ctx, td) assert.ErrorIs(t, err, errTooManyBatchers) - wg.Done() }() wg.Wait() diff --git a/pkg/otel/arrow_record/producer.go b/pkg/otel/arrow_record/producer.go index 6b6926faac..3b88642a6c 100644 --- a/pkg/otel/arrow_record/producer.go +++ b/pkg/otel/arrow_record/producer.go @@ -21,7 +21,6 @@ import ( "sort" "time" - "github.com/HdrHistogram/hdrhistogram-go" "github.com/apache/arrow/go/v14/arrow" "github.com/apache/arrow/go/v14/arrow/ipc" "github.com/apache/arrow/go/v14/arrow/memory" @@ -240,7 +239,7 @@ func (p *Producer) BatchArrowRecordsFromMetrics(metrics pmetric.Metrics) (*colar if err != nil { return nil, werror.Wrap(err) } - p.stats.MetricsBatchesProduced++ + p.stats.MetricsBatchesProduced.Add(1) return bar, nil } @@ -271,7 +270,7 @@ func (p *Producer) BatchArrowRecordsFromLogs(ls plog.Logs) (*colarspb.BatchArrow if err != nil { return nil, werror.Wrap(err) } - p.stats.LogsBatchesProduced++ + p.stats.LogsBatchesProduced.Add(1) return bar, nil } @@ -302,7 +301,7 @@ func (p *Producer) BatchArrowRecordsFromTraces(ts ptrace.Traces) (*colarspb.Batc if err != nil { return nil, werror.Wrap(err) } - p.stats.TracesBatchesProduced++ + p.stats.TracesBatchesProduced.Add(1) return bar, nil } @@ -347,7 +346,7 @@ func (p *Producer) Close() error { if err := sp.ipcWriter.Close(); err != nil { return werror.Wrap(err) } - p.stats.StreamProducersClosed++ + p.stats.StreamProducersClosed.Add(1) } return nil } @@ -386,7 +385,7 @@ func (p *Producer) Produce(rms []*record_message.RecordMessage) (*colarspb.Batch if err := sp.ipcWriter.Close(); err != nil { return werror.Wrap(err) } - p.stats.StreamProducersClosed++ + p.stats.StreamProducersClosed.Add(1) delete(p.streamProducers, ssID) } } @@ -399,7 +398,7 @@ func (p *Producer) Produce(rms []*record_message.RecordMessage) (*colarspb.Batch } p.streamProducers[rm.SchemaID()] = sp p.nextSchemaId++ - p.stats.StreamProducersCreated++ + p.stats.StreamProducersCreated.Add(1) } sp.lastProduction = time.Now() @@ -433,19 +432,7 @@ func (p *Producer) Produce(rms []*record_message.RecordMessage) (*colarspb.Batch payloadType := rm.PayloadType().String() recordSize := int64(len(buf)) - recordSizeDist, ok := p.stats.RecordBuilderStats.RecordSizeDistribution[payloadType] - if !ok { - recordSizeDist = &pstats.RecordSizeStats{ - TotalSize: 0, - Dist: hdrhistogram.New(0, 1<<32, 2), - } - p.stats.RecordBuilderStats.RecordSizeDistribution[payloadType] = recordSizeDist - } - - recordSizeDist.TotalSize += recordSize - if err := recordSizeDist.Dist.RecordValue(recordSize); err != nil { - return werror.Wrap(err) - } + p.stats.RecordBuilderStats.Observe(payloadType, recordSize) if p.stats.RecordStats { fmt.Printf("Record %q -> %d bytes\n", payloadType, len(buf)) diff --git a/pkg/otel/common/schema/builder/record.go b/pkg/otel/common/schema/builder/record.go index 2dc393e6ea..64341d26ab 100644 --- a/pkg/otel/common/schema/builder/record.go +++ b/pkg/otel/common/schema/builder/record.go @@ -26,7 +26,7 @@ import ( carrow "github.com/open-telemetry/otel-arrow/pkg/arrow" "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema" - "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema/config" + builder "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema/config" "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema/events" "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema/transform" "github.com/open-telemetry/otel-arrow/pkg/otel/common/schema/update" @@ -287,7 +287,7 @@ func (rb *RecordBuilderExt) UpdateSchema() { rb.schemaID = carrow.SchemaToID(newSchema) rb.updateRequest.Reset() - rb.stats.RecordBuilderStats.SchemaUpdatesPerformed++ + rb.stats.RecordBuilderStats.SchemaUpdatesPerformed.Add(1) if rb.stats.SchemaUpdates { println("To =====>") diff --git a/pkg/otel/common/schema/transform/dictionary.go b/pkg/otel/common/schema/transform/dictionary.go index 8958edde3b..74d22d47bf 100644 --- a/pkg/otel/common/schema/transform/dictionary.go +++ b/pkg/otel/common/schema/transform/dictionary.go @@ -203,12 +203,12 @@ func (t *DictionaryField) updateIndexType(stats *stats.RecordBuilderStats) { t.currentIndex = 0 t.schemaUpdateRequest.Inc(&update.DictionaryOverflowEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal}) t.events.DictionariesWithOverflow[t.path] = true - stats.DictionaryOverflowDetected++ + stats.DictionaryOverflowDetected.Add(1) } } else if t.currentIndex != currentIndex { t.schemaUpdateRequest.Inc(&update.DictionaryUpgradeEvent{FieldName: t.path, PrevIndexType: prevIndexType, NewIndexType: t.IndexType(), Cardinality: t.cardinality, Total: t.cumulativeTotal}) t.events.DictionariesIndexTypeChanged[t.path] = t.indexTypes[t.currentIndex].Name() - stats.DictionaryIndexTypeChanged++ + stats.DictionaryIndexTypeChanged.Add(1) } return } diff --git a/pkg/otel/stats/stats.go b/pkg/otel/stats/stats.go index 4fe894bd87..7d72f4011f 100644 --- a/pkg/otel/stats/stats.go +++ b/pkg/otel/stats/stats.go @@ -22,6 +22,8 @@ package stats import ( "fmt" "sort" + "sync" + "sync/atomic" "github.com/HdrHistogram/hdrhistogram-go" ) @@ -29,11 +31,11 @@ import ( type ( // ProducerStats is a struct that contains stats about the OTLP Arrow Producer. ProducerStats struct { - MetricsBatchesProduced uint64 - LogsBatchesProduced uint64 - TracesBatchesProduced uint64 - StreamProducersCreated uint64 - StreamProducersClosed uint64 + MetricsBatchesProduced atomic.Uint64 + LogsBatchesProduced atomic.Uint64 + TracesBatchesProduced atomic.Uint64 + StreamProducersCreated atomic.Uint64 + StreamProducersClosed atomic.Uint64 RecordBuilderStats RecordBuilderStats // SchemaStats is a flag that indicates whether to display schema stats. @@ -52,31 +54,25 @@ type ( } RecordSizeStats struct { - TotalSize int64 - Dist *hdrhistogram.Histogram + totalSize int64 + dist *hdrhistogram.Histogram } RecordBuilderStats struct { - SchemaUpdatesPerformed uint64 - DictionaryIndexTypeChanged uint64 - DictionaryOverflowDetected uint64 - RecordSizeDistribution map[string]*RecordSizeStats + SchemaUpdatesPerformed atomic.Uint64 + DictionaryIndexTypeChanged atomic.Uint64 + DictionaryOverflowDetected atomic.Uint64 + + recordSizeDistributionLock sync.Mutex + recordSizeDistribution map[string]*RecordSizeStats } ) // NewProducerStats creates a new ProducerStats struct. func NewProducerStats() *ProducerStats { return &ProducerStats{ - MetricsBatchesProduced: 0, - LogsBatchesProduced: 0, - TracesBatchesProduced: 0, - StreamProducersCreated: 0, - StreamProducersClosed: 0, RecordBuilderStats: RecordBuilderStats{ - SchemaUpdatesPerformed: 0, - DictionaryIndexTypeChanged: 0, - DictionaryOverflowDetected: 0, - RecordSizeDistribution: make(map[string]*RecordSizeStats), + recordSizeDistribution: make(map[string]*RecordSizeStats), }, SchemaStats: false, SchemaUpdates: false, @@ -92,19 +88,19 @@ func (s *ProducerStats) GetAndReset() ProducerStats { // Reset sets all stats to zero. func (s *ProducerStats) Reset() { - s.MetricsBatchesProduced = 0 - s.LogsBatchesProduced = 0 - s.TracesBatchesProduced = 0 - s.StreamProducersCreated = 0 - s.StreamProducersClosed = 0 + s.MetricsBatchesProduced.Store(0) + s.LogsBatchesProduced.Store(0) + s.TracesBatchesProduced.Store(0) + s.StreamProducersCreated.Store(0) + s.StreamProducersClosed.Store(0) s.RecordBuilderStats.Reset() } // Reset sets all stats to zero. func (s *RecordBuilderStats) Reset() { - s.SchemaUpdatesPerformed = 0 - s.DictionaryIndexTypeChanged = 0 - s.DictionaryOverflowDetected = 0 + s.SchemaUpdatesPerformed.Store(0) + s.DictionaryIndexTypeChanged.Store(0) + s.DictionaryOverflowDetected.Store(0) } // Show prints the stats to the console. @@ -123,31 +119,54 @@ func (s *ProducerStats) RecordSizeStats() map[string]*RecordSizeStats { return s.RecordBuilderStats.RecordSizeStats() } +func (s *RecordBuilderStats) Observe(payloadType string, recordSize int64) { + s.recordSizeDistributionLock.Lock() + defer s.recordSizeDistributionLock.Unlock() + + recordSizeDist, ok := s.recordSizeDistribution[payloadType] + if !ok { + recordSizeDist = &RecordSizeStats{ + totalSize: 0, + dist: hdrhistogram.New(0, 1<<32, 2), + } + s.recordSizeDistribution[payloadType] = recordSizeDist + } + + recordSizeDist.totalSize += recordSize + _ = recordSizeDist.dist.RecordValue(recordSize) +} + +func (s *RecordSizeStats) TotalSize() int64 { + return s.totalSize +} + // Show prints the RecordBuilder stats to the console. func (s *RecordBuilderStats) Show(indent string) { fmt.Printf("%s- Schema updates performed: %d\n", indent, s.SchemaUpdatesPerformed) fmt.Printf("%s- Dictionary index type changed: %d\n", indent, s.DictionaryIndexTypeChanged) fmt.Printf("%s- Dictionary overflow detected: %d\n", indent, s.DictionaryOverflowDetected) - if len(s.RecordSizeDistribution) > 0 { + s.recordSizeDistributionLock.Lock() + defer s.recordSizeDistributionLock.Unlock() + if len(s.recordSizeDistribution) > 0 { type RecordSizeStats struct { PayloadType string TotalSize int64 - Dist *hdrhistogram.Histogram + Dist *hdrhistogram.Snapshot Percent float64 } var recordSizeStats []RecordSizeStats totalSize := int64(0) - for k, v := range s.RecordSizeDistribution { + for k, v := range s.recordSizeDistribution { recordSizeStats = append(recordSizeStats, RecordSizeStats{ PayloadType: k, - TotalSize: v.TotalSize, - Dist: v.Dist, + TotalSize: v.totalSize, + Dist: v.dist.Export(), Percent: 0, }) - totalSize += v.TotalSize + totalSize += v.totalSize } // Compute the percentage of each record size @@ -162,11 +181,12 @@ func (s *RecordBuilderStats) Show(indent string) { fmt.Printf("%s- Record size distribution:\n", indent) for _, v := range recordSizeStats { + dist := hdrhistogram.Import(v.Dist) fmt.Printf("%s - %-18s: %8d bytes (%04.1f%%) (min: %7d, max: %7d, mean: %7.1f, stdev: %7.1f, p50: %7d, p99: %7d)\n", indent, v.PayloadType, v.TotalSize, v.Percent, - v.Dist.Min(), v.Dist.Max(), v.Dist.Mean(), - v.Dist.StdDev(), - v.Dist.ValueAtQuantile(50), v.Dist.ValueAtQuantile(99), + dist.Min(), dist.Max(), dist.Mean(), + dist.StdDev(), + dist.ValueAtQuantile(50), dist.ValueAtQuantile(99), ) } } @@ -174,7 +194,22 @@ func (s *RecordBuilderStats) Show(indent string) { // RecordSizeStats returns statistics per record payload type. func (s *RecordBuilderStats) RecordSizeStats() map[string]*RecordSizeStats { - return s.RecordSizeDistribution + s.recordSizeDistributionLock.Lock() + defer s.recordSizeDistributionLock.Lock() + + m := map[string]*RecordSizeStats{} + + for k, v := range s.recordSizeDistribution { + rss := &RecordSizeStats{ + totalSize: v.totalSize, + dist: hdrhistogram.Import(v.dist.Export()), + } + rss.dist.Merge(v.dist) + m[k] = rss + + } + + return s.recordSizeDistribution } // CompareRecordSizeStats compares the record size stats with and without compression @@ -194,15 +229,15 @@ func CompareRecordSizeStats(withCompression map[string]*RecordSizeStats, withNoC statsWithNoCompression, ok := withNoCompression[payloadType] totalSizeWithNoCompression := int64(0) if ok { - totalSizeWithNoCompression = statsWithNoCompression.TotalSize + totalSizeWithNoCompression = statsWithNoCompression.totalSize } recordSizeStats = append(recordSizeStats, RecordSizeStats{ PayloadType: payloadType, - TotalSizeWithCompression: stats.TotalSize, + TotalSizeWithCompression: stats.totalSize, TotalSizeWithNoCompression: totalSizeWithNoCompression, Percent: 0, }) - totalSize += stats.TotalSize + totalSize += stats.totalSize } // Compute the percentage of each record size (with compression) diff --git a/tools/trace_analyzer/main.go b/tools/trace_analyzer/main.go index 29fc3a5dc7..1673d76b79 100644 --- a/tools/trace_analyzer/main.go +++ b/tools/trace_analyzer/main.go @@ -243,7 +243,7 @@ func runTrial( totalSize := int64(0) for _, s := range withCompressionStats { - totalSize += s.TotalSize + totalSize += s.TotalSize() } return totalSize