Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

umount: wait for writting staging chunks #3224

Merged
merged 7 commits into from
Feb 10, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 101 additions & 3 deletions cmd/umount.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package cmd

import (
"errors"
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"time"

"github.com/dustin/go-humanize"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
)

Expand All @@ -43,6 +48,11 @@ $ juicefs umount /mnt/jfs`,
Aliases: []string{"f"},
Usage: "unmount a busy mount point by force",
},
&cli.BoolFlag{
Name: "writeback",
Aliases: []string{"w"},
Usage: "wait for all staging chunks to be written back",
},
},
}
}
Expand Down Expand Up @@ -90,6 +100,94 @@ func doUmount(mp string, force bool) error {
func umount(ctx *cli.Context) error {
setup(ctx, 1)
mp := ctx.Args().Get(0)
force := ctx.Bool("force")
return doUmount(mp, force)
if ctx.Bool("writeback") {
raw, err := os.ReadFile(path.Join(mp, ".config"))
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("not a JuiceFS mount point")
}
return errors.Wrap(err, "failed to read config")
}

var conf vfs.Config
if err = json.Unmarshal(raw, &conf); err != nil {
return errors.Wrap(err, "failed to parse config")
}
if conf.Chunk.Writeback {
stagingDir := path.Join(conf.Chunk.CacheDir, "rawstaging")
if err := waitWritebackComplete(stagingDir); err != nil {
return err
}
defer func() {
size, _ := fileSizeInDir(stagingDir)
clearLastLine()
if size == 0 {
fmt.Println("\rAll staging chunks are written back")
} else {
fmt.Printf("\r%s staging chunks are not written back\n", humanize.IBytes(size))
}
}()
}
}
return doUmount(mp, ctx.Bool("force"))
}

func waitWritebackComplete(stagingDir string) error {
lastLeft := uint64(0)
for {
_, err := os.Stat(stagingDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrap(err, "failed to read staging directory")
}
start := time.Now()
size, err := fileSizeInDir(stagingDir)
if err != nil {
if os.IsNotExist(err) {
continue
}
return errors.Wrap(err, "failed to read staging directory")
}
if lastLeft == 0 {
lastLeft = size
}

if size == 0 && lastLeft == 0 {
return nil
}

speed := uint64(0)
if lastLeft > size {
speed = lastLeft - size
}

leftTime := 720 * time.Hour
if speed != 0 {
leftTime = time.Duration(size/speed) * time.Second
}
clearLastLine()
fmt.Printf("\r%s staging chunks are being written back... %s/s, left %s", humanize.IBytes(size), humanize.IBytes(speed), leftTime)
zhijian-pro marked this conversation as resolved.
Show resolved Hide resolved
lastLeft = size
time.Sleep(time.Second - time.Since(start))
}
}

func fileSizeInDir(dir string) (uint64, error) {
var size uint64
err := filepath.Walk(dir, func(name string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += uint64(info.Size())
}
return nil
})
return size, err
}

func clearLastLine() {
fmt.Printf("\r ")
}