From 46092547503a5225301896f8789873a10dafc24f Mon Sep 17 00:00:00 2001 From: Changxin Miao Date: Tue, 23 Apr 2024 15:41:14 +0800 Subject: [PATCH] chunk: Enable concurrent write to both stage files and object storage to get higher write throughput (#4743) Signed-off-by: Changxin Miao --- cmd/flags.go | 5 +++ cmd/mount.go | 3 +- pkg/chunk/cached_store.go | 21 +++++++--- pkg/chunk/disk_cache.go | 85 +++++++++++++++++++++++---------------- pkg/chunk/metrics.go | 16 +++++++- 5 files changed, 87 insertions(+), 43 deletions(-) diff --git a/cmd/flags.go b/cmd/flags.go index df56d82e2373..d0b72efa2e48 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -127,6 +127,11 @@ func storageFlags() []cli.Flag { Value: 20, Usage: "number of connections to upload", }, + &cli.IntFlag{ + Name: "max-stage-write", + Value: 0, // Enable this to have concurrent uploads to two backends, and get write bandwidth equals to sum of the two + Usage: "number of threads allowed to write staged files, other requests will be uploaded directly (this option is only effective when 'writeback' mode is enabled)", + }, &cli.IntFlag{ Name: "max-deletes", Value: 10, diff --git a/cmd/mount.go b/cmd/mount.go index 8a8a03357c17..4e21dddba641 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -83,7 +83,7 @@ $ juicefs mount redis://localhost /mnt/jfs --backup-meta 0`, func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string { var ip, port string - //default set + // default set ip, port, err := net.SplitHostPort(c.String("metrics")) if err != nil { logger.Fatalf("metrics format error: %v", err) @@ -323,6 +323,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config { GetTimeout: utils.Duration(c.String("get-timeout")), PutTimeout: utils.Duration(c.String("put-timeout")), MaxUpload: c.Int("max-uploads"), + MaxStageWrite: c.Int("max-stage-write"), MaxRetries: c.Int("io-retries"), Writeback: c.Bool("writeback"), Prefetch: c.Int("prefetch"), diff --git a/pkg/chunk/cached_store.go b/pkg/chunk/cached_store.go index ef5fd22b540b..ee730181e147 100644 --- a/pkg/chunk/cached_store.go +++ b/pkg/chunk/cached_store.go @@ -438,8 +438,10 @@ func (s *wSlice) upload(indx int) { if s.store.conf.Writeback { stagingPath, err := s.store.bcache.stage(key, block.Data, s.store.shouldCache(blen)) if err != nil { - s.store.stageBlockErrors.Add(1) - logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err) + if !errors.Is(err, errStageConcurrency) { + s.store.stageBlockErrors.Add(1) + logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err) + } } else { s.errors <- nil if s.store.conf.UploadDelay == 0 && s.store.canUpload() { @@ -538,6 +540,7 @@ type Config struct { AutoCreate bool Compress string MaxUpload int + MaxStageWrite int MaxRetries int UploadLimit int64 // bytes per second DownloadLimit int64 // bytes per second @@ -563,10 +566,16 @@ func (c *Config) SelfCheck(uuid string) { } c.CacheDir = "memory" } - if !c.Writeback && (c.UploadDelay > 0 || c.UploadHours != "") { - logger.Warnf("delayed upload is disabled in non-writeback mode") - c.UploadDelay = 0 - c.UploadHours = "" + if !c.Writeback { + if c.UploadDelay > 0 || c.UploadHours != "" { + logger.Warnf("delayed upload is disabled in non-writeback mode") + c.UploadDelay = 0 + c.UploadHours = "" + } + if c.MaxStageWrite > 0 { + logger.Warnf("max-stage-write is disabled in non-writeback mode") + c.MaxStageWrite = 0 + } } if _, _, err := c.parseHours(); err != nil { logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err) diff --git a/pkg/chunk/disk_cache.go b/pkg/chunk/disk_cache.go index 31badd624f06..ac3a4cb3664e 100644 --- a/pkg/chunk/disk_cache.go +++ b/pkg/chunk/disk_cache.go @@ -45,12 +45,15 @@ import ( ) var ( - stagingDir = "rawstaging" - cacheDir = "raw" - maxIODur = time.Second * 20 - maxIOErrors = 10 - errNotCached = errors.New("not cached") - errCacheDown = errors.New("cache down") + stagingDir = "rawstaging" + cacheDir = "raw" + maxIODur = time.Second * 20 + maxIOErrors = 10 + stagingBlocks atomic.Int64 + errNotCached = errors.New("not cached") + errCacheDown = errors.New("cache down") + errStageFull = errors.New("space not enough on device") + errStageConcurrency = errors.New("concurrent staging limit reached") ) type cacheKey struct { @@ -75,16 +78,17 @@ type cacheStore struct { id string totalPages int64 sync.Mutex - dir string - mode os.FileMode - capacity int64 - freeRatio float32 - hashPrefix bool - scanInterval time.Duration - cacheExpire time.Duration - pending chan pendingFile - pages map[string]*Page - m *cacheManagerMetrics + dir string + mode os.FileMode + maxStageWrite int + capacity int64 + freeRatio float32 + hashPrefix bool + scanInterval time.Duration + cacheExpire time.Duration + pending chan pendingFile + pages map[string]*Page + m *cacheManagerMetrics used int64 keys map[cacheKey]cacheItem @@ -108,21 +112,22 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP config.FreeSpace = 0.1 // 10% } c := &cacheStore{ - m: m, - dir: dir, - mode: config.CacheMode, - capacity: cacheSize, - freeRatio: config.FreeSpace, - eviction: config.CacheEviction, - checksum: config.CacheChecksum, - hashPrefix: config.HashPrefix, - scanInterval: config.CacheScanInterval, - cacheExpire: config.CacheExpire, - keys: make(map[cacheKey]cacheItem), - pending: make(chan pendingFile, pendingPages), - pages: make(map[string]*Page), - uploader: uploader, - opTs: make(map[time.Duration]func() error), + m: m, + dir: dir, + mode: config.CacheMode, + capacity: cacheSize, + maxStageWrite: config.MaxStageWrite, + freeRatio: config.FreeSpace, + eviction: config.CacheEviction, + checksum: config.CacheChecksum, + hashPrefix: config.HashPrefix, + scanInterval: config.CacheScanInterval, + cacheExpire: config.CacheExpire, + keys: make(map[cacheKey]cacheItem), + pending: make(chan pendingFile, pendingPages), + pages: make(map[string]*Page), + uploader: uploader, + opTs: make(map[time.Duration]func() error), } c.createDir(c.dir) br, fr := c.curFreeRatio() @@ -551,8 +556,12 @@ func (cache *cacheStore) remove(key string) { } cache.Unlock() if path != "" { - _ = cache.removeFile(path) - _ = cache.removeStage(key) + if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove %s failed: %s", path, err) + } + if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) { + logger.Warnf("remove %s failed: %s", cache.stagePath(key), err) + } } } @@ -645,12 +654,18 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) { func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string, error) { stagingPath := cache.stagePath(key) if cache.stageFull { - return stagingPath, errors.New("space not enough on device") + return stagingPath, errStageFull + } + if cache.maxStageWrite != 0 && stagingBlocks.Load() > int64(cache.maxStageWrite) { + return stagingPath, errStageConcurrency } + stagingBlocks.Add(1) + defer stagingBlocks.Add(-1) err := cache.flushPage(stagingPath, data) if err == nil { cache.m.stageBlocks.Add(1) cache.m.stageBlockBytes.Add(float64(len(data))) + cache.m.stageWriteBytes.Add(float64(len(data))) if cache.capacity > 0 && keepCache { path := cache.cachePath(key) cache.createDir(filepath.Dir(path)) @@ -845,7 +860,7 @@ func (cache *cacheStore) scanCached() { cache.Lock() cache.scanned = true - logger.Debugf("Found %d cached blocks (%d bytes) in %s with %s", len(cache.keys), cache.used, cache.dir, time.Since(start)) + logger.Debugf("Found %d cached blocks (%s) in %s with %s", len(cache.keys), humanize.IBytes(uint64(cache.used)), cache.dir, time.Since(start)) cache.Unlock() } diff --git a/pkg/chunk/metrics.go b/pkg/chunk/metrics.go index 3ad9dad5edbc..1e1b125667d0 100644 --- a/pkg/chunk/metrics.go +++ b/pkg/chunk/metrics.go @@ -16,7 +16,9 @@ package chunk -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) // CacheManager Metrics type cacheManagerMetrics struct { @@ -27,6 +29,7 @@ type cacheManagerMetrics struct { cacheWriteHist prometheus.Histogram stageBlocks prometheus.Gauge stageBlockBytes prometheus.Gauge + stageWriteBytes prometheus.Counter } func newCacheManagerMetrics(reg prometheus.Registerer) *cacheManagerMetrics { @@ -45,6 +48,13 @@ func (c *cacheManagerMetrics) registerMetrics(reg prometheus.Registerer) { reg.MustRegister(c.cacheWriteBytes) reg.MustRegister(c.stageBlocks) reg.MustRegister(c.stageBlockBytes) + reg.MustRegister(c.stageWriteBytes) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "staging_writing_blocks", + Help: "Number of writing blocks in staging.", + }, func() float64 { + return float64(stagingBlocks.Load()) + })) } } @@ -78,4 +88,8 @@ func (c *cacheManagerMetrics) initMetrics() { Name: "staging_block_bytes", Help: "Total bytes of blocks in the staging path.", }) + c.stageWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "staging_write_bytes", + Help: "write bytes of blocks in the staging path.", + }) }