Skip to content

Commit

Permalink
chunk: Enable concurrent write to both stage files and object storage…
Browse files Browse the repository at this point in the history
… to get higher write throughput (#4743)

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit authored Apr 23, 2024
1 parent a49b59d commit 4609254
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 43 deletions.
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func storageFlags() []cli.Flag {
Value: 20,
Usage: "number of connections to upload",
},
&cli.IntFlag{
Name: "max-stage-write",
Value: 0, // Enable this to have concurrent uploads to two backends, and get write bandwidth equals to sum of the two
Usage: "number of threads allowed to write staged files, other requests will be uploaded directly (this option is only effective when 'writeback' mode is enabled)",
},
&cli.IntFlag{
Name: "max-deletes",
Value: 10,
Expand Down
3 changes: 2 additions & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ $ juicefs mount redis://localhost /mnt/jfs --backup-meta 0`,

func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string {
var ip, port string
//default set
// default set
ip, port, err := net.SplitHostPort(c.String("metrics"))
if err != nil {
logger.Fatalf("metrics format error: %v", err)
Expand Down Expand Up @@ -323,6 +323,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
GetTimeout: utils.Duration(c.String("get-timeout")),
PutTimeout: utils.Duration(c.String("put-timeout")),
MaxUpload: c.Int("max-uploads"),
MaxStageWrite: c.Int("max-stage-write"),
MaxRetries: c.Int("io-retries"),
Writeback: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
Expand Down
21 changes: 15 additions & 6 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,10 @@ func (s *wSlice) upload(indx int) {
if s.store.conf.Writeback {
stagingPath, err := s.store.bcache.stage(key, block.Data, s.store.shouldCache(blen))
if err != nil {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
if !errors.Is(err, errStageConcurrency) {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
}
} else {
s.errors <- nil
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
Expand Down Expand Up @@ -538,6 +540,7 @@ type Config struct {
AutoCreate bool
Compress string
MaxUpload int
MaxStageWrite int
MaxRetries int
UploadLimit int64 // bytes per second
DownloadLimit int64 // bytes per second
Expand All @@ -563,10 +566,16 @@ func (c *Config) SelfCheck(uuid string) {
}
c.CacheDir = "memory"
}
if !c.Writeback && (c.UploadDelay > 0 || c.UploadHours != "") {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
if !c.Writeback {
if c.UploadDelay > 0 || c.UploadHours != "" {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
}
if c.MaxStageWrite > 0 {
logger.Warnf("max-stage-write is disabled in non-writeback mode")
c.MaxStageWrite = 0
}
}
if _, _, err := c.parseHours(); err != nil {
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
Expand Down
85 changes: 50 additions & 35 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ import (
)

var (
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
stagingBlocks atomic.Int64
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
errStageFull = errors.New("space not enough on device")
errStageConcurrency = errors.New("concurrent staging limit reached")
)

type cacheKey struct {
Expand All @@ -75,16 +78,17 @@ type cacheStore struct {
id string
totalPages int64
sync.Mutex
dir string
mode os.FileMode
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
cacheExpire time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics
dir string
mode os.FileMode
maxStageWrite int
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
cacheExpire time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics

used int64
keys map[cacheKey]cacheItem
Expand All @@ -108,21 +112,22 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP
config.FreeSpace = 0.1 // 10%
}
c := &cacheStore{
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
cacheExpire: config.CacheExpire,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
opTs: make(map[time.Duration]func() error),
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
maxStageWrite: config.MaxStageWrite,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
cacheExpire: config.CacheExpire,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
opTs: make(map[time.Duration]func() error),
}
c.createDir(c.dir)
br, fr := c.curFreeRatio()
Expand Down Expand Up @@ -551,8 +556,12 @@ func (cache *cacheStore) remove(key string) {
}
cache.Unlock()
if path != "" {
_ = cache.removeFile(path)
_ = cache.removeStage(key)
if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", path, err)
}
if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", cache.stagePath(key), err)
}
}
}

Expand Down Expand Up @@ -645,12 +654,18 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {
func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string, error) {
stagingPath := cache.stagePath(key)
if cache.stageFull {
return stagingPath, errors.New("space not enough on device")
return stagingPath, errStageFull
}
if cache.maxStageWrite != 0 && stagingBlocks.Load() > int64(cache.maxStageWrite) {
return stagingPath, errStageConcurrency
}
stagingBlocks.Add(1)
defer stagingBlocks.Add(-1)
err := cache.flushPage(stagingPath, data)
if err == nil {
cache.m.stageBlocks.Add(1)
cache.m.stageBlockBytes.Add(float64(len(data)))
cache.m.stageWriteBytes.Add(float64(len(data)))
if cache.capacity > 0 && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
Expand Down Expand Up @@ -845,7 +860,7 @@ func (cache *cacheStore) scanCached() {

cache.Lock()
cache.scanned = true
logger.Debugf("Found %d cached blocks (%d bytes) in %s with %s", len(cache.keys), cache.used, cache.dir, time.Since(start))
logger.Debugf("Found %d cached blocks (%s) in %s with %s", len(cache.keys), humanize.IBytes(uint64(cache.used)), cache.dir, time.Since(start))
cache.Unlock()
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/chunk/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package chunk

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)

// CacheManager Metrics
type cacheManagerMetrics struct {
Expand All @@ -27,6 +29,7 @@ type cacheManagerMetrics struct {
cacheWriteHist prometheus.Histogram
stageBlocks prometheus.Gauge
stageBlockBytes prometheus.Gauge
stageWriteBytes prometheus.Counter
}

func newCacheManagerMetrics(reg prometheus.Registerer) *cacheManagerMetrics {
Expand All @@ -45,6 +48,13 @@ func (c *cacheManagerMetrics) registerMetrics(reg prometheus.Registerer) {
reg.MustRegister(c.cacheWriteBytes)
reg.MustRegister(c.stageBlocks)
reg.MustRegister(c.stageBlockBytes)
reg.MustRegister(c.stageWriteBytes)
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "staging_writing_blocks",
Help: "Number of writing blocks in staging.",
}, func() float64 {
return float64(stagingBlocks.Load())
}))
}
}

Expand Down Expand Up @@ -78,4 +88,8 @@ func (c *cacheManagerMetrics) initMetrics() {
Name: "staging_block_bytes",
Help: "Total bytes of blocks in the staging path.",
})
c.stageWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "staging_write_bytes",
Help: "write bytes of blocks in the staging path.",
})
}

0 comments on commit 4609254

Please sign in to comment.