Skip to content

Commit

Permalink
Limit concurrent writes to staged files and spill others to object ba…
Browse files Browse the repository at this point in the history
…ckend, to have concurrent uploads to two backends, and even double the write bandwidth

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit committed Apr 23, 2024
1 parent 54e0300 commit 3e36f8a
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 43 deletions.
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func storageFlags() []cli.Flag {
Value: 20,
Usage: "number of connections to upload",
},
&cli.IntFlag{
Name: "max-stage-write",
Value: 0, // Enable this to have concurrent uploads to two backends, and get write bandwidth equals to sum of the two
Usage: "number of threads allowed to write staged files, other requests will be uploaded directly (this option is only effective when 'writeback' mode is enabled)",
},
&cli.IntFlag{
Name: "max-deletes",
Value: 10,
Expand Down
3 changes: 2 additions & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ $ juicefs mount redis://localhost /mnt/jfs --backup-meta 0`,

func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string {
var ip, port string
//default set
// default set
ip, port, err := net.SplitHostPort(c.String("metrics"))
if err != nil {
logger.Fatalf("metrics format error: %v", err)
Expand Down Expand Up @@ -323,6 +323,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
GetTimeout: utils.Duration(c.String("get-timeout")),
PutTimeout: utils.Duration(c.String("put-timeout")),
MaxUpload: c.Int("max-uploads"),
MaxStageWrite: c.Int("max-stage-write"),
MaxRetries: c.Int("io-retries"),
Writeback: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
Expand Down
21 changes: 15 additions & 6 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,10 @@ func (s *wSlice) upload(indx int) {
if s.store.conf.Writeback {
stagingPath, err := s.store.bcache.stage(key, block.Data, s.store.shouldCache(blen))
if err != nil {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
if !errors.Is(err, errStageConcurrency) {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
}
} else {
s.errors <- nil
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
Expand Down Expand Up @@ -538,6 +540,7 @@ type Config struct {
AutoCreate bool
Compress string
MaxUpload int
MaxStageWrite int
MaxRetries int
UploadLimit int64 // bytes per second
DownloadLimit int64 // bytes per second
Expand All @@ -563,10 +566,16 @@ func (c *Config) SelfCheck(uuid string) {
}
c.CacheDir = "memory"
}
if !c.Writeback && (c.UploadDelay > 0 || c.UploadHours != "") {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
if !c.Writeback {
if c.UploadDelay > 0 || c.UploadHours != "" {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
}
if c.MaxStageWrite > 0 {
logger.Warnf("max-stage-write is disabled in non-writeback mode")
c.MaxStageWrite = 0
}
}
if _, _, err := c.parseHours(); err != nil {
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
Expand Down
85 changes: 50 additions & 35 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ import (
)

var (
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
stagingBlocks atomic.Int64
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
errStageFull = errors.New("space not enough on device")
errStageConcurrency = errors.New("concurrent staging limit reached")
)

type cacheKey struct {
Expand All @@ -75,16 +78,17 @@ type cacheStore struct {
id string
totalPages int64
sync.Mutex
dir string
mode os.FileMode
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
cacheExpire time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics
dir string
mode os.FileMode
maxStageWrite int
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
cacheExpire time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics

used int64
keys map[cacheKey]cacheItem
Expand All @@ -108,21 +112,22 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP
config.FreeSpace = 0.1 // 10%
}
c := &cacheStore{
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
cacheExpire: config.CacheExpire,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
opTs: make(map[time.Duration]func() error),
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
maxStageWrite: config.MaxStageWrite,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
cacheExpire: config.CacheExpire,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
opTs: make(map[time.Duration]func() error),
}
c.createDir(c.dir)
br, fr := c.curFreeRatio()
Expand Down Expand Up @@ -551,8 +556,12 @@ func (cache *cacheStore) remove(key string) {
}
cache.Unlock()
if path != "" {
_ = cache.removeFile(path)
_ = cache.removeStage(key)
if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", path, err)
}
if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", cache.stagePath(key), err)
}
}
}

Expand Down Expand Up @@ -645,12 +654,18 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {
func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string, error) {
stagingPath := cache.stagePath(key)
if cache.stageFull {
return stagingPath, errors.New("space not enough on device")
return stagingPath, errStageFull
}
if cache.maxStageWrite != 0 && stagingBlocks.Load() > int64(cache.maxStageWrite) {
return stagingPath, errStageConcurrency
}
stagingBlocks.Add(1)
defer stagingBlocks.Add(-1)
err := cache.flushPage(stagingPath, data)
if err == nil {
cache.m.stageBlocks.Add(1)
cache.m.stageBlockBytes.Add(float64(len(data)))
cache.m.stageWriteBytes.Add(float64(len(data)))
if cache.capacity > 0 && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
Expand Down Expand Up @@ -845,7 +860,7 @@ func (cache *cacheStore) scanCached() {

cache.Lock()
cache.scanned = true
logger.Debugf("Found %d cached blocks (%d bytes) in %s with %s", len(cache.keys), cache.used, cache.dir, time.Since(start))
logger.Debugf("Found %d cached blocks (%s) in %s with %s", len(cache.keys), humanize.IBytes(uint64(cache.used)), cache.dir, time.Since(start))
cache.Unlock()
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/chunk/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package chunk

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)

// CacheManager Metrics
type cacheManagerMetrics struct {
Expand All @@ -27,6 +29,7 @@ type cacheManagerMetrics struct {
cacheWriteHist prometheus.Histogram
stageBlocks prometheus.Gauge
stageBlockBytes prometheus.Gauge
stageWriteBytes prometheus.Counter
}

func newCacheManagerMetrics(reg prometheus.Registerer) *cacheManagerMetrics {
Expand All @@ -45,6 +48,13 @@ func (c *cacheManagerMetrics) registerMetrics(reg prometheus.Registerer) {
reg.MustRegister(c.cacheWriteBytes)
reg.MustRegister(c.stageBlocks)
reg.MustRegister(c.stageBlockBytes)
reg.MustRegister(c.stageWriteBytes)
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "staging_writing_blocks",
Help: "Number of writing blocks in staging.",
}, func() float64 {
return float64(stagingBlocks.Load())
}))
}
}

Expand Down Expand Up @@ -78,4 +88,8 @@ func (c *cacheManagerMetrics) initMetrics() {
Name: "staging_block_bytes",
Help: "Total bytes of blocks in the staging path.",
})
c.stageWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "staging_write_bytes",
Help: "write bytes of blocks in the staging path.",
})
}

0 comments on commit 3e36f8a

Please sign in to comment.