Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

umount: wait for writting staging chunks #3224

Merged
merged 7 commits into from
Feb 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions cmd/umount.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package cmd

import (
"errors"
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"time"

"github.com/dustin/go-humanize"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
)

Expand All @@ -43,6 +48,10 @@ $ juicefs umount /mnt/jfs`,
Aliases: []string{"f"},
Usage: "unmount a busy mount point by force",
},
&cli.BoolFlag{
Name: "flush",
Usage: "wait for all staging chunks to be flushed",
},
},
}
}
Expand Down Expand Up @@ -90,6 +99,94 @@ func doUmount(mp string, force bool) error {
func umount(ctx *cli.Context) error {
setup(ctx, 1)
mp := ctx.Args().Get(0)
force := ctx.Bool("force")
return doUmount(mp, force)
if ctx.Bool("flush") {
raw, err := os.ReadFile(path.Join(mp, ".config"))
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("not a JuiceFS mount point")
}
return errors.Wrap(err, "failed to read config")
}

var conf vfs.Config
if err = json.Unmarshal(raw, &conf); err != nil {
return errors.Wrap(err, "failed to parse config")
}
if conf.Chunk.Writeback {
stagingDir := path.Join(conf.Chunk.CacheDir, "rawstaging")
if err := waitWritebackComplete(stagingDir); err != nil {
return err
}
defer func() {
size, _ := fileSizeInDir(stagingDir)
clearLastLine()
if size == 0 {
fmt.Println("\rAll staging chunks are flushed")
} else {
fmt.Printf("\r%s staging chunks are not flushed\n", humanize.IBytes(size))
}
}()
}
}
return doUmount(mp, ctx.Bool("force"))
}

func waitWritebackComplete(stagingDir string) error {
lastLeft := uint64(0)
for {
_, err := os.Stat(stagingDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrap(err, "failed to read staging directory")
}
start := time.Now()
size, err := fileSizeInDir(stagingDir)
if err != nil {
if os.IsNotExist(err) {
continue
}
return errors.Wrap(err, "failed to read staging directory")
}
if lastLeft == 0 {
lastLeft = size
}

if size == 0 && lastLeft == 0 {
return nil
}

speed := uint64(0)
if lastLeft > size {
speed = lastLeft - size
}

leftTime := 720 * time.Hour
if speed != 0 {
leftTime = time.Duration(size/speed) * time.Second
}
clearLastLine()
fmt.Printf("\r%s staging chunks are being flushed... %s/s, left %s", humanize.IBytes(size), humanize.IBytes(speed), leftTime)
lastLeft = size
time.Sleep(time.Second - time.Since(start))
}
}

func fileSizeInDir(dir string) (uint64, error) {
var size uint64
err := filepath.Walk(dir, func(name string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += uint64(info.Size())
}
return nil
})
return size, err
}

func clearLastLine() {
fmt.Printf("\r ")
}