Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunk: add checksum for cache files #2626

Merged
merged 12 commits into from
Sep 9, 2022
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func clientFlags() []cli.Flag {
Name: "cache-partial-only",
Usage: "cache only random/small read",
},
&cli.IntFlag{
Name: "checksum",
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
Value: 1,
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
Usage: "checksum level (0: disable, 1: only full, 2: shrink, 3: extend)",
},
&cli.StringFlag{
Name: "backup-meta",
Value: "3600",
Expand Down
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
FreeSpace: float32(c.Float64("free-space-ratio")),
CacheMode: os.FileMode(0600),
CacheFullBlock: !c.Bool("cache-partial-only"),
Checksum: c.Int("checksum"),
AutoCreate: true,
}
if chunkConf.MaxUpload <= 0 {
Expand Down
2 changes: 1 addition & 1 deletion cmd/warmup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestWarmup(t *testing.T) {
time.Sleep(2 * time.Second)
filePath = fmt.Sprintf("%s/%s/raw/chunks/0/0/1_0_4", cacheDir, uuid)
content, err := os.ReadFile(filePath)
if err != nil || string(content) != "test" {
if err != nil || len(content) < 4 || string(content[:4]) != "test" {
t.Fatalf("warmup: %s; got content %s", err, content)
}
}
7 changes: 4 additions & 3 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ type Config struct {
BufferSize int
Readahead int
Prefetch int
Checksum int
}

type cachedStore struct {
Expand Down Expand Up @@ -808,7 +809,8 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
logger.Debugf("Key %s is not needed, drop it", key)
return
}
f, err := os.Open(stagingPath)
blen := parseObjOrigSize(key)
f, err := openCacheFile(stagingPath, blen, store.conf.Checksum)
if err != nil {
store.pendingMutex.Lock()
_, ok = store.pendingKeys[key]
Expand All @@ -820,9 +822,8 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
}
return
}
blen := parseObjOrigSize(key)
block := NewOffPage(blen)
_, err = io.ReadFull(f, block.Data)
_, err = f.ReadAt(block.Data, 0)
_ = f.Close()
if err != nil {
block.Release()
Expand Down
144 changes: 142 additions & 2 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package chunk

import (
"errors"
"fmt"
"hash/crc32"
"hash/fnv"
"io"
"math"
Expand Down Expand Up @@ -64,6 +66,7 @@ type cacheStore struct {
keys map[string]cacheItem
scanned bool
full bool
checksum int // checksum level
uploader func(key, path string, force bool) bool
}

Expand All @@ -83,6 +86,7 @@ func newCacheStore(m *cacheManager, dir string, cacheSize int64, pendingPages in
keys: make(map[string]cacheItem),
pending: make(chan pendingFile, pendingPages),
pages: make(map[string]*Page),
checksum: config.Checksum,
uploader: uploader,
}
c.createDir(c.dir)
Expand Down Expand Up @@ -194,6 +198,13 @@ func (cache *cacheStore) flushPage(path string, data []byte) (err error) {
_ = f.Close()
return
}
if cache.checksum > 0 {
if _, err = f.Write(checksum(data)); err != nil {
logger.Warnf("Write checksum to cache file %s failed: %s", tmp, err)
_ = f.Close()
return
}
}
if err = f.Close(); err != nil {
logger.Warnf("Close cache file %s failed: %s", tmp, err)
return
Expand Down Expand Up @@ -256,7 +267,7 @@ func (cache *cacheStore) load(key string) (ReadCloser, error) {
return nil, errors.New("not cached")
}
cache.Unlock()
f, err := os.Open(cache.cachePath(key))
f, err := openCacheFile(cache.cachePath(key), parseObjOrigSize(key), cache.checksum)
cache.Lock()
if err == nil {
if it, ok := cache.keys[key]; ok {
Expand Down Expand Up @@ -723,7 +734,7 @@ func (m *cacheManager) cache(key string, p *Page, force bool) {
}

type ReadCloser interface {
io.Reader
// io.Reader
io.ReaderAt
io.Closer
}
Expand All @@ -747,3 +758,132 @@ func (m *cacheManager) stagePath(key string) string {
func (m *cacheManager) uploaded(key string, size int) {
m.getStore(key).uploaded(key, size)
}

/* --- Checksum --- */
const (
csNone = iota
csFull
csShrink
csExtend

csBlock = 32 << 10
)

type cacheFile struct {
*os.File
length int // length of data
csLevel int
}

// Calculate 32-bits checksum for every 32 KiB data, so 512 Bytes for 4 MiB in total
func checksum(data []byte) []byte {
length := len(data)
buf := utils.NewBuffer(uint32((length-1)/csBlock+1) * 4)
for start, end := 0, 0; start < length; start = end {
end = start + csBlock
if end > length {
end = length
}
sum := crc32.ChecksumIEEE(data[start:end])
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
buf.Put32(sum)
}
return buf.Bytes()
}

func openCacheFile(name string, length int, level int) (*cacheFile, error) {
fp, err := os.Open(name)
if err != nil {
return nil, err
}
fi, err := fp.Stat()
if err != nil {
_ = fp.Close()
return nil, err
}
checksumLength := ((length-1)/csBlock + 1) * 4
switch fi.Size() - int64(length) {
case 0:
return &cacheFile{fp, length, csNone}, nil
case int64(checksumLength):
return &cacheFile{fp, length, level}, nil
default:
_ = fp.Close()
return nil, fmt.Errorf("invalid file size %d, data length %d", fi.Size(), length)
}
}

func (cf *cacheFile) ReadAt(b []byte, off int64) (n int, err error) {
logger.Tracef("CacheFile length %d level %d, readat off %d buffer size %d", cf.length, cf.csLevel, off, len(b))
defer func() {
logger.Tracef("CacheFile readat returns n %d err %s", n, err)
}()
if cf.csLevel == csNone || cf.csLevel == csFull && (off != 0 || len(b) != cf.length) {
return cf.File.ReadAt(b, off)
}
var rb = b // read buffer
var roff = off // read offset
if cf.csLevel == csExtend {
roff = off / csBlock * csBlock
rend := int(off) + len(b)
if rend%csBlock != 0 {
rend = (rend/csBlock + 1) * csBlock
if rend > cf.length {
rend = cf.length
}
}
if size := rend - int(roff); size != len(b) {
p := NewOffPage(size)
rb = p.Data
defer func() {
if err == nil {
n = copy(b, rb[off-roff:])
} else {
n = 0 // TODO: adjust n and b
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
}
p.Release()
}()
}
}
if n, err = cf.File.ReadAt(rb, roff); err != nil {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
return
}

ioff := int(roff) / csBlock // index offset
if cf.csLevel == csShrink {
if roff%csBlock != 0 {
if o := csBlock - int(roff)%csBlock; len(rb) <= o {
return
} else {
rb = rb[o:]
ioff += 1
}
}
if end := int(roff) + n; end != cf.length && end%csBlock != 0 {
if len(rb) <= end%csBlock {
return
}
rb = rb[:len(rb)-end%csBlock]
}
}
// now rb contains the data to check
length := len(rb)
buf := utils.NewBuffer(uint32((length-1)/csBlock+1) * 4)
if _, err = cf.File.ReadAt(buf.Bytes(), int64(cf.length+ioff*4)); err != nil {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
logger.Warnf("Read checksum of data length %d checksum offset %d: %s", length, cf.length+ioff*4, err)
return
}
for start, end := 0, 0; start < length; start = end {
end = start + csBlock
if end > length {
end = length
}
sum := crc32.ChecksumIEEE(rb[start:end])
expect := buf.Get32()
logger.Debugf("Cache file read data start %d end %d checksum %d, expected %d", start, end, sum, expect)
if sum != expect {
err = fmt.Errorf("data checksum %d != expect %d", sum, expect)
break
}
}
return
}