Skip to content

Commit 2dfbc7d

Browse files
committed
pkg/accesslogs: flush logs periodically
This change adds the ability for the engine (Processor) to flush logs to uploader/storage at a per-key specified interval. Fortunately, this change doesn't significantly hurt performance: goos: linux goarch: amd64 pkg: storj.io/common/accesslogs cpu: AMD Ryzen 5 1600 Six-Core Processor │ OLD │ NEW │ │ sec/op │ sec/op vs base │ ParallelQueueEntry-12 3.684µ ± 14% 3.527µ ± 10% ~ (p=0.280 n=10) QueueEntry-12 4.399µ ± 11% 4.523µ ± 6% ~ (p=0.739 n=10) RandomKey-12 3.746µ ± 7% 3.836µ ± 6% ~ (p=0.190 n=10) UniqueString-12 1.853µ ± 5% 1.929µ ± 4% ~ (p=0.109 n=10) ParallelQueueEntryWithTimedFlush-12 7.701µ ± 20% QueueEntryWithTimedFlush-12 8.101µ ± 1% geomean 3.256µ 4.411µ +1.22% Closes storj/edge#497 Change-Id: I685302c9febb70739588992e4065e1b810bcfdb0
1 parent d4a752c commit 2dfbc7d

File tree

2 files changed

+257
-38
lines changed

2 files changed

+257
-38
lines changed

accesslogs/processor.go

+136-31
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,21 @@ package accesslogs
88

99
import (
1010
"bytes"
11+
"context"
1112
"encoding/hex"
13+
"runtime/pprof"
1214
"strings"
1315
"sync"
1416
"sync/atomic"
1517
"time"
1618

1719
"github.com/spacemonkeygo/monkit/v3"
20+
"github.com/zeebo/errs"
1821
"go.uber.org/zap"
1922

23+
"storj.io/common/errs2"
2024
"storj.io/common/memory"
25+
"storj.io/common/sync2"
2126
"storj.io/common/uuid"
2227
)
2328

@@ -28,10 +33,15 @@ const (
2833
defaultUploaderRetryLimit = 3
2934
)
3035

31-
var mon = monkit.Package()
36+
var (
37+
mon = monkit.Package()
38+
timedFlushLabels = pprof.Labels("accesslogs", "timedFlush")
39+
)
3240

3341
// Key is a key that logs for the specified project ID and bucket can be
34-
// queued. It's not a key under which packed logs are saved.
42+
// queued.
43+
//
44+
// Logs are stored in the specified bucket under the specified prefix.
3545
type Key struct {
3646
PublicProjectID uuid.UUID
3747
Bucket string
@@ -52,20 +62,34 @@ type Processor struct {
5262
log *zap.Logger
5363
upload uploader
5464

55-
defaultEntryLimit memory.Size
56-
defaultShipmentLimit memory.Size
65+
timedFlushes errs2.Group
66+
cancelFlushes sync2.Fence
67+
68+
defaultEntryLimit memory.Size
69+
defaultShipmentLimit memory.Size
70+
defaultShipmentInterval time.Duration
5771

5872
globalLimit memory.Size
5973

60-
parcels sync.Map
61-
globalSize int64
74+
// pendingWrites is used to prevent logical races between queuing
75+
// new entries and timed flushes. If we don't use it, we risk losing
76+
// entries because timed flush dereferences a parcel (to prevent
77+
// trash buildup), which could happen during queuing if we didn't
78+
// lock. RWMutex is used to deprioritize flushes from new writes.
79+
// This locking is global for all keys instead of being per-key like
80+
// the parcels map is, but it shouldn't be a big performance hit as
81+
// long as locking that happens during the timed flush is brief.
82+
pendingWrites sync.RWMutex
83+
parcels sync.Map
84+
globalSize int64
6285
}
6386

6487
// Options define how Processor should be configured when initialized.
6588
type Options struct {
66-
DefaultEntryLimit memory.Size `user:"true" help:"log entry size limit" default:"2KiB"`
67-
DefaultShipmentLimit memory.Size `user:"true" help:"log file size limit" default:"63MiB"`
68-
UploadingOptions struct {
89+
DefaultEntryLimit memory.Size `user:"true" help:"log entry size limit" default:"2KiB"`
90+
DefaultShipmentLimit memory.Size `user:"true" help:"log file size limit" default:"63MiB"`
91+
DefaultShipmentInterval time.Duration `user:"true" help:"log file time limit regardless of size (0 means unlimited)" default:"1h"`
92+
UploadingOptions struct {
6993
QueueLimit int `user:"true" help:"log file upload queue limit" default:"100"`
7094
RetryLimit int `user:"true" help:"maximum number of retries for log file uploads" default:"3"`
7195
ShutdownTimeout time.Duration `user:"true" help:"time limit waiting for queued logs to finish uploading when gateway is shutting down" default:"1m"`
@@ -100,27 +124,33 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor {
100124
retryLimit: opts.UploadingOptions.RetryLimit,
101125
shutdownTimeout: opts.UploadingOptions.ShutdownTimeout,
102126
}),
103-
defaultEntryLimit: opts.DefaultEntryLimit,
104-
defaultShipmentLimit: opts.DefaultShipmentLimit,
105-
globalLimit: opts.DefaultShipmentLimit * 100,
127+
128+
defaultEntryLimit: opts.DefaultEntryLimit,
129+
defaultShipmentLimit: opts.DefaultShipmentLimit,
130+
defaultShipmentInterval: opts.DefaultShipmentInterval,
131+
132+
globalLimit: opts.DefaultShipmentLimit * 100,
106133
}
107134
}
108135

109136
// QueueEntry saves another entry under key for packaging and upload.
110-
// Provided access will be used for upload.
137+
// store is saved only for the first time the key is seen.
111138
func (p *Processor) QueueEntry(store Storage, key Key, entry Entry) (err error) {
112139
defer mon.Task()(nil)(&err)
113140

114141
entrySize := entry.Size().Int()
115142

116143
if g := atomic.LoadInt64(&p.globalSize); g+int64(entrySize) > p.globalLimit.Int64() {
117-
// NOTE(artur): we could return an error here, but we would have
118-
// to flush immediately afterward.
144+
// NOTE(artur): this is a best-effort check; we could return an
145+
// error here, but we would have to flush immediately afterward.
119146
mon.Event("global_limit_exceeded")
120147
p.log.Warn("globalLimit exceeded", zap.Int64("limit", p.globalLimit.Int64()), zap.Int64("size", g))
121148
}
122149

123-
loaded, _ := p.parcels.LoadOrStore(key, &parcel{
150+
p.pendingWrites.RLock()
151+
defer p.pendingWrites.RUnlock()
152+
153+
actual, loaded := p.parcels.LoadOrStore(key, &parcel{
124154
// TODO(artur): make entryLimit & shipmentLimit configurable via
125155
// Entry.
126156
entryLimit: p.defaultEntryLimit.Int(),
@@ -130,7 +160,19 @@ func (p *Processor) QueueEntry(store Storage, key Key, entry Entry) (err error)
130160
prefix: key.Prefix,
131161
})
132162

133-
parcel := loaded.(*parcel)
163+
parcel := actual.(*parcel)
164+
165+
if !loaded && p.defaultShipmentInterval > 0 {
166+
p.timedFlushes.Go(func() error {
167+
var flushErr error
168+
pprof.Do(context.Background(), timedFlushLabels, func(ctx context.Context) {
169+
// TODO(artur): make defaultShipmentInterval
170+
// configurable via Entry.
171+
flushErr = p.timedFlush(key, p.defaultShipmentInterval)
172+
})
173+
return flushErr
174+
})
175+
}
134176

135177
if entrySize > parcel.entryLimit {
136178
return Error.Wrap(ErrTooLarge)
@@ -146,6 +188,39 @@ func (p *Processor) QueueEntry(store Storage, key Key, entry Entry) (err error)
146188
return nil
147189
}
148190

191+
func (p *Processor) timedFlush(key Key, interval time.Duration) error {
192+
t := time.NewTimer(interval)
193+
defer t.Stop()
194+
195+
select {
196+
case <-p.cancelFlushes.Done():
197+
return nil
198+
case <-t.C:
199+
p.pendingWrites.Lock()
200+
actual, _ := p.parcels.LoadAndDelete(key)
201+
p.pendingWrites.Unlock()
202+
203+
parcel := actual.(*parcel)
204+
205+
shipped, err := parcel.flush(p.upload)
206+
207+
mon.IntVal("globalLimit").Observe(atomic.AddInt64(&p.globalSize, int64(-shipped)))
208+
p.log.Debug("timed flush",
209+
zap.String("PublicProjectID", key.PublicProjectID.String()),
210+
zap.String("Bucket", key.Bucket),
211+
zap.String("Prefix", key.Prefix),
212+
zap.Int("size", shipped),
213+
zap.Error(err),
214+
)
215+
216+
if err != nil {
217+
return errs.New("couldn't flush %s/%s/%s: %w", key.PublicProjectID, key.Bucket, key.Prefix, err)
218+
}
219+
}
220+
221+
return nil
222+
}
223+
149224
// Run starts Processor.
150225
func (p *Processor) Run() error {
151226
return Error.Wrap(p.upload.run())
@@ -162,6 +237,11 @@ func (p *Processor) Run() error {
162237
func (p *Processor) Close() (err error) {
163238
defer mon.Task()(nil)(&err)
164239

240+
var g errs.Group
241+
242+
p.cancelFlushes.Release()
243+
g.Add(p.timedFlushes.Wait()...)
244+
165245
p.parcels.Range(func(k, v any) bool {
166246
key, parcel := k.(Key), v.(*parcel)
167247
if err := parcel.close(p.upload); err != nil {
@@ -171,14 +251,19 @@ func (p *Processor) Close() (err error) {
171251
zap.String("Prefix", key.Prefix),
172252
zap.Error(err),
173253
)
254+
g.Add(errs.New("couldn't close %s/%s/%s: %w", key.PublicProjectID, key.Bucket, key.Prefix, err))
174255
}
175256
return true
176257
})
177-
return Error.Wrap(p.upload.close())
258+
259+
g.Add(p.upload.close())
260+
261+
return Error.Wrap(g.Err())
178262
}
179263

180264
type parcel struct {
181-
entryLimit, shipmentLimit int
265+
entryLimit int
266+
shipmentLimit int
182267

183268
store Storage
184269
bucket, prefix string
@@ -208,11 +293,13 @@ func (p *parcel) add(upload uploader, size int, s string) (shipped int, err erro
208293
if err != nil {
209294
return 0, err
210295
}
211-
c := bytes.NewBuffer(nil)
212-
if _, err = p.current.WriteTo(c); err != nil {
296+
// we use cloneUnsafe here because we already have the lock.
297+
c, err := p.cloneUnsafe()
298+
if err != nil {
213299
return 0, err
214300
}
215-
if err = upload.queueUpload(p.store, p.bucket, k, c.Bytes()); err != nil {
301+
if err = upload.queueUpload(p.store, p.bucket, k, c); err != nil {
302+
// FIXME(artur): rewind the buffer if we fail to upload.
216303
return 0, err
217304
}
218305
shipped = currentSize
@@ -222,29 +309,47 @@ func (p *parcel) add(upload uploader, size int, s string) (shipped int, err erro
222309
return shipped, nil
223310
}
224311

225-
func (p *parcel) flush(upload uploader) error {
312+
// cloneUnsafe creates a thread-unsafe clone of the parcel.
313+
func (p *parcel) cloneUnsafe() ([]byte, error) {
314+
c := bytes.NewBuffer(nil)
315+
if _, err := p.current.WriteTo(c); err != nil {
316+
return nil, err
317+
}
318+
return c.Bytes(), nil
319+
}
320+
321+
// clone creates a thread-safe clone of the parcel.
322+
func (p *parcel) clone() ([]byte, error) {
323+
p.mu.Lock()
324+
defer p.mu.Unlock()
325+
326+
return p.cloneUnsafe()
327+
}
328+
329+
func (p *parcel) flush(upload uploader) (int, error) {
226330
// NOTE(artur): here we need to queue upload without limits because when we
227331
// flush before close, we really want to drain all parcels as we won't have
228332
// the chance to trigger shipment later on.
229333
k, err := randomKey(p.prefix, time.Now())
230334
if err != nil {
231-
return err
335+
return 0, err
232336
}
233-
c := bytes.NewBuffer(nil)
234-
if _, err = p.current.WriteTo(c); err != nil {
235-
return err
337+
c, err := p.clone()
338+
if err != nil {
339+
return 0, err
236340
}
237-
return upload.queueUploadWithoutQueueLimit(p.store, p.bucket, k, c.Bytes())
341+
return len(c), upload.queueUploadWithoutQueueLimit(p.store, p.bucket, k, c)
238342
}
239343

240344
func (p *parcel) close(upload uploader) error {
241345
p.mu.Lock()
242-
defer p.mu.Unlock()
243-
244346
if !p.closed {
245347
p.closed = true
246-
return p.flush(upload)
348+
p.mu.Unlock()
349+
_, err := p.flush(upload)
350+
return err
247351
}
352+
p.mu.Unlock()
248353
return nil
249354
}
250355

0 commit comments

Comments
 (0)