Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable concurrent write to both stage files and object storage to get 5GB/s write throughput #4743

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func storageFlags() []cli.Flag {
Value: 20,
Usage: "number of connections to upload",
},
&cli.IntFlag{
Name: "max-stage-write",
Value: 0, // Enable this to have concurrent uploads to two backends, and get write bandwidth equals to sum of the two
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
Usage: "number of threads allowed to write staged files, other requests will be uploaded directly (this option is only effective when 'writeback' mode is enabled)",
},
&cli.IntFlag{
Name: "max-deletes",
Value: 10,
Expand Down
3 changes: 2 additions & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ $ juicefs mount redis://localhost /mnt/jfs --backup-meta 0`,

func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string {
var ip, port string
//default set
// default set
ip, port, err := net.SplitHostPort(c.String("metrics"))
if err != nil {
logger.Fatalf("metrics format error: %v", err)
Expand Down Expand Up @@ -323,6 +323,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
GetTimeout: utils.Duration(c.String("get-timeout")),
PutTimeout: utils.Duration(c.String("put-timeout")),
MaxUpload: c.Int("max-uploads"),
MaxStageWrite: c.Int("max-stage-write"),
MaxRetries: c.Int("io-retries"),
Writeback: c.Bool("writeback"),
Prefetch: c.Int("prefetch"),
Expand Down
21 changes: 15 additions & 6 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,10 @@ func (s *wSlice) upload(indx int) {
if s.store.conf.Writeback {
stagingPath, err := s.store.bcache.stage(key, block.Data, s.store.shouldCache(blen))
if err != nil {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
if !errors.Is(err, errStageConcurrency) {
s.store.stageBlockErrors.Add(1)
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
}
} else {
s.errors <- nil
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
Expand Down Expand Up @@ -538,6 +540,7 @@ type Config struct {
AutoCreate bool
Compress string
MaxUpload int
MaxStageWrite int
MaxRetries int
UploadLimit int64 // bytes per second
DownloadLimit int64 // bytes per second
Expand All @@ -563,10 +566,16 @@ func (c *Config) SelfCheck(uuid string) {
}
c.CacheDir = "memory"
}
if !c.Writeback && (c.UploadDelay > 0 || c.UploadHours != "") {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
if !c.Writeback {
if c.UploadDelay > 0 || c.UploadHours != "" {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
}
if c.MaxStageWrite > 0 {
logger.Warnf("max-stage-write is disabled in non-writeback mode")
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
c.MaxStageWrite = 0
}
}
if _, _, err := c.parseHours(); err != nil {
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
Expand Down
85 changes: 50 additions & 35 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ import (
)

var (
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
stagingDir = "rawstaging"
cacheDir = "raw"
maxIODur = time.Second * 20
maxIOErrors = 10
stagingBlocks atomic.Int64
errNotCached = errors.New("not cached")
errCacheDown = errors.New("cache down")
errStageFull = errors.New("space not enough on device")
errStageConcurrency = errors.New("concurrent staging limit reached")
)

type cacheKey struct {
Expand All @@ -75,16 +78,17 @@ type cacheStore struct {
id string
totalPages int64
sync.Mutex
dir string
mode os.FileMode
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
cacheExpire time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics
dir string
mode os.FileMode
maxStageWrite int
capacity int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
cacheExpire time.Duration
pending chan pendingFile
pages map[string]*Page
m *cacheManagerMetrics

used int64
keys map[cacheKey]cacheItem
Expand All @@ -108,21 +112,22 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP
config.FreeSpace = 0.1 // 10%
}
c := &cacheStore{
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
cacheExpire: config.CacheExpire,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
opTs: make(map[time.Duration]func() error),
m: m,
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
maxStageWrite: config.MaxStageWrite,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
checksum: config.CacheChecksum,
hashPrefix: config.HashPrefix,
scanInterval: config.CacheScanInterval,
cacheExpire: config.CacheExpire,
keys: make(map[cacheKey]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
uploader: uploader,
opTs: make(map[time.Duration]func() error),
}
c.createDir(c.dir)
br, fr := c.curFreeRatio()
Expand Down Expand Up @@ -551,8 +556,12 @@ func (cache *cacheStore) remove(key string) {
}
cache.Unlock()
if path != "" {
_ = cache.removeFile(path)
_ = cache.removeStage(key)
if err := cache.removeFile(path); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", path, err)
}
if err := cache.removeStage(key); err != nil && !os.IsNotExist(err) {
logger.Warnf("remove %s failed: %s", cache.stagePath(key), err)
}
}
}

Expand Down Expand Up @@ -645,12 +654,18 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {
func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string, error) {
stagingPath := cache.stagePath(key)
if cache.stageFull {
return stagingPath, errors.New("space not enough on device")
return stagingPath, errStageFull
}
if cache.maxStageWrite != 0 && stagingBlocks.Load() > int64(cache.maxStageWrite) {
return stagingPath, errStageConcurrency
}
stagingBlocks.Add(1)
defer stagingBlocks.Add(-1)
err := cache.flushPage(stagingPath, data)
if err == nil {
cache.m.stageBlocks.Add(1)
cache.m.stageBlockBytes.Add(float64(len(data)))
cache.m.stageWriteBytes.Add(float64(len(data)))
if cache.capacity > 0 && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
Expand Down Expand Up @@ -845,7 +860,7 @@ func (cache *cacheStore) scanCached() {

cache.Lock()
cache.scanned = true
logger.Debugf("Found %d cached blocks (%d bytes) in %s with %s", len(cache.keys), cache.used, cache.dir, time.Since(start))
logger.Debugf("Found %d cached blocks (%s) in %s with %s", len(cache.keys), humanize.IBytes(uint64(cache.used)), cache.dir, time.Since(start))
cache.Unlock()
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/chunk/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package chunk

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)

// CacheManager Metrics
type cacheManagerMetrics struct {
Expand All @@ -27,6 +29,7 @@ type cacheManagerMetrics struct {
cacheWriteHist prometheus.Histogram
stageBlocks prometheus.Gauge
stageBlockBytes prometheus.Gauge
stageWriteBytes prometheus.Counter
}

func newCacheManagerMetrics(reg prometheus.Registerer) *cacheManagerMetrics {
Expand All @@ -45,6 +48,13 @@ func (c *cacheManagerMetrics) registerMetrics(reg prometheus.Registerer) {
reg.MustRegister(c.cacheWriteBytes)
reg.MustRegister(c.stageBlocks)
reg.MustRegister(c.stageBlockBytes)
reg.MustRegister(c.stageWriteBytes)
reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "staging_writing_blocks",
Help: "Number of writing blocks in staging.",
}, func() float64 {
return float64(stagingBlocks.Load())
}))
}
}

Expand Down Expand Up @@ -78,4 +88,8 @@ func (c *cacheManagerMetrics) initMetrics() {
Name: "staging_block_bytes",
Help: "Total bytes of blocks in the staging path.",
})
c.stageWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "staging_write_bytes",
Help: "write bytes of blocks in the staging path.",
})
}
Loading