Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunk: add checksum for cache files #2626

Merged
merged 12 commits into from
Sep 9, 2022
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func clientFlags() []cli.Flag {
Name: "cache-partial-only",
Usage: "cache only random/small read",
},
&cli.StringFlag{
Name: "verify-cache-checksum",
Value: "full",
Usage: "checksum level (none, full, shrink, extend)",
},
&cli.StringFlag{
Name: "cache-scan-interval",
Value: "3600",
Expand Down
5 changes: 5 additions & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
FreeSpace: float32(c.Float64("free-space-ratio")),
CacheMode: os.FileMode(0600),
CacheFullBlock: !c.Bool("cache-partial-only"),
CacheChecksum: c.String("verify-cache-checksum"),
CacheScanInterval: duration(c.String("cache-scan-interval")),
AutoCreate: true,
}
Expand All @@ -352,6 +353,10 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
}
chunkConf.CacheDir = strings.Join(ds, string(os.PathListSeparator))
}
if cs := []string{chunk.CsNone, chunk.CsFull, chunk.CsShrink, chunk.CsExtend}; !utils.StringContains(cs, chunkConf.CacheChecksum) {
logger.Warnf("verify-cache-checksum should be one of %v", cs)
chunkConf.CacheChecksum = chunk.CsFull
}
return chunkConf
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/warmup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestWarmup(t *testing.T) {
time.Sleep(2 * time.Second)
filePath = fmt.Sprintf("%s/%s/raw/chunks/0/0/1_0_4", cacheDir, uuid)
content, err := os.ReadFile(filePath)
if err != nil || string(content) != "test" {
if err != nil || len(content) < 4 || string(content[:4]) != "test" {
t.Fatalf("warmup: %s; got content %s", err, content)
}
}
7 changes: 4 additions & 3 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ type Config struct {
CacheDir string
CacheMode os.FileMode
CacheSize int64
CacheChecksum string
CacheScanInterval time.Duration
FreeSpace float32
AutoCreate bool
Expand Down Expand Up @@ -809,7 +810,8 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
logger.Debugf("Key %s is not needed, drop it", key)
return
}
f, err := os.Open(stagingPath)
blen := parseObjOrigSize(key)
f, err := openCacheFile(stagingPath, blen, store.conf.CacheChecksum)
if err != nil {
store.pendingMutex.Lock()
_, ok = store.pendingKeys[key]
Expand All @@ -821,9 +823,8 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
}
return
}
blen := parseObjOrigSize(key)
block := NewOffPage(blen)
_, err = io.ReadFull(f, block.Data)
_, err = f.ReadAt(block.Data, 0)
_ = f.Close()
if err != nil {
block.Release()
Expand Down
23 changes: 13 additions & 10 deletions pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,19 @@ func testStore(t *testing.T, store ChunkStore) {
}

var defaultConf = Config{
BlockSize: 1 << 20,
CacheDir: filepath.Join(os.TempDir(), "diskCache"),
CacheSize: 1,
MaxUpload: 1,
MaxDeletes: 1,
MaxRetries: 10,
PutTimeout: time.Second,
GetTimeout: time.Second * 2,
AutoCreate: true,
BufferSize: 10 << 20,
BlockSize: 1 << 20,
CacheDir: filepath.Join(os.TempDir(), "diskCache"),
CacheMode: 0600,
CacheSize: 10,
CacheChecksum: CsNone,
CacheScanInterval: time.Second * 300,
MaxUpload: 1,
MaxDeletes: 1,
MaxRetries: 10,
PutTimeout: time.Second,
GetTimeout: time.Second * 2,
AutoCreate: true,
BufferSize: 10 << 20,
}

func TestStoreDefault(t *testing.T) {
Expand Down
146 changes: 144 additions & 2 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package chunk

import (
"errors"
"fmt"
"hash/crc32"
"hash/fnv"
"io"
"math"
Expand Down Expand Up @@ -66,6 +68,7 @@ type cacheStore struct {
keys map[string]cacheItem
scanned bool
full bool
checksum string // checksum level
uploader func(key, path string, force bool) bool
}

Expand All @@ -82,6 +85,7 @@ func newCacheStore(m *cacheManager, dir string, cacheSize int64, pendingPages in
mode: config.CacheMode,
capacity: cacheSize,
freeRatio: config.FreeSpace,
checksum: config.CacheChecksum,
scanInterval: config.CacheScanInterval,
keys: make(map[string]cacheItem),
pending: make(chan pendingFile, pendingPages),
Expand Down Expand Up @@ -200,6 +204,13 @@ func (cache *cacheStore) flushPage(path string, data []byte) (err error) {
_ = f.Close()
return
}
if cache.checksum != CsNone {
if _, err = f.Write(checksum(data)); err != nil {
logger.Warnf("Write checksum to cache file %s failed: %s", tmp, err)
_ = f.Close()
return
}
}
if err = f.Close(); err != nil {
logger.Warnf("Close cache file %s failed: %s", tmp, err)
return
Expand Down Expand Up @@ -262,7 +273,7 @@ func (cache *cacheStore) load(key string) (ReadCloser, error) {
return nil, errors.New("not cached")
}
cache.Unlock()
f, err := os.Open(cache.cachePath(key))
f, err := openCacheFile(cache.cachePath(key), parseObjOrigSize(key), cache.checksum)
cache.Lock()
if err == nil {
if it, ok := cache.keys[key]; ok {
Expand Down Expand Up @@ -739,7 +750,7 @@ func (m *cacheManager) cache(key string, p *Page, force bool) {
}

type ReadCloser interface {
io.Reader
// io.Reader
io.ReaderAt
io.Closer
}
Expand All @@ -763,3 +774,134 @@ func (m *cacheManager) stagePath(key string) string {
func (m *cacheManager) uploaded(key string, size int) {
m.getStore(key).uploaded(key, size)
}

/* --- Checksum --- */
const (
CsNone = "none"
CsFull = "full"
CsShrink = "shrink"
CsExtend = "extend"

csBlock = 32 << 10
)

var crc32c = crc32.MakeTable(crc32.Castagnoli)

type cacheFile struct {
*os.File
length int // length of data
csLevel string
}

// Calculate 32-bits checksum for every 32 KiB data, so 512 Bytes for 4 MiB in total
func checksum(data []byte) []byte {
length := len(data)
buf := utils.NewBuffer(uint32((length-1)/csBlock+1) * 4)
for start, end := 0, 0; start < length; start = end {
end = start + csBlock
if end > length {
end = length
}
sum := crc32.Checksum(data[start:end], crc32c)
buf.Put32(sum)
}
return buf.Bytes()
}

func openCacheFile(name string, length int, level string) (*cacheFile, error) {
fp, err := os.Open(name)
if err != nil {
return nil, err
}
fi, err := fp.Stat()
if err != nil {
_ = fp.Close()
return nil, err
}
checksumLength := ((length-1)/csBlock + 1) * 4
switch fi.Size() - int64(length) {
case 0:
return &cacheFile{fp, length, CsNone}, nil
case int64(checksumLength):
return &cacheFile{fp, length, level}, nil
default:
_ = fp.Close()
return nil, fmt.Errorf("invalid file size %d, data length %d", fi.Size(), length)
}
}

func (cf *cacheFile) ReadAt(b []byte, off int64) (n int, err error) {
logger.Tracef("CacheFile length %d level %s, readat off %d buffer size %d", cf.length, cf.csLevel, off, len(b))
defer func() {
logger.Tracef("CacheFile readat returns n %d err %s", n, err)
}()
if cf.csLevel == CsNone || cf.csLevel == CsFull && (off != 0 || len(b) != cf.length) {
return cf.File.ReadAt(b, off)
}
var rb = b // read buffer
var roff = off // read offset
if cf.csLevel == CsExtend {
roff = off / csBlock * csBlock
rend := int(off) + len(b)
if rend%csBlock != 0 {
rend = (rend/csBlock + 1) * csBlock
if rend > cf.length {
rend = cf.length
}
}
if size := rend - int(roff); size != len(b) {
p := NewOffPage(size)
rb = p.Data
defer func() {
if err == nil {
n = copy(b, rb[off-roff:])
} else {
n = 0 // TODO: adjust n and b
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
}
p.Release()
}()
}
}
if n, err = cf.File.ReadAt(rb, roff); err != nil {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
return
}

ioff := int(roff) / csBlock // index offset
if cf.csLevel == CsShrink {
if roff%csBlock != 0 {
if o := csBlock - int(roff)%csBlock; len(rb) <= o {
return
} else {
rb = rb[o:]
ioff += 1
}
}
if end := int(roff) + n; end != cf.length && end%csBlock != 0 {
if len(rb) <= end%csBlock {
return
}
rb = rb[:len(rb)-end%csBlock]
}
}
// now rb contains the data to check
length := len(rb)
buf := utils.NewBuffer(uint32((length-1)/csBlock+1) * 4)
if _, err = cf.File.ReadAt(buf.Bytes(), int64(cf.length+ioff*4)); err != nil {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
logger.Warnf("Read checksum of data length %d checksum offset %d: %s", length, cf.length+ioff*4, err)
return
}
for start, end := 0, 0; start < length; start = end {
end = start + csBlock
if end > length {
end = length
}
sum := crc32.Checksum(rb[start:end], crc32c)
expect := buf.Get32()
logger.Debugf("Cache file read data start %d end %d checksum %d, expected %d", start, end, sum, expect)
if sum != expect {
err = fmt.Errorf("data checksum %d != expect %d", sum, expect)
break
}
}
return
}
87 changes: 87 additions & 0 deletions pkg/chunk/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package chunk

import (
"math/rand"
"os"
"path/filepath"
"testing"
Expand All @@ -30,6 +31,92 @@ func TestNewCacheStore(t *testing.T) {
}
}

func TestChecksum(t *testing.T) {
m := newCacheManager(&defaultConf, nil, nil)
s := m.(*cacheManager).stores[0]
k1 := "0_0_10" // no checksum
k2 := "1_0_10"
k3 := "2_1_102400"
k4 := "3_5_102400" // corrupt data
k5 := "4_8_1048576"

p := NewPage([]byte("helloworld"))
s.cache(k1, p, true)

s.checksum = CsFull
s.cache(k2, p, true)

buf := make([]byte, 102400)
_, _ = rand.Read(buf)
s.cache(k3, NewPage(buf), true)

fpath := s.cachePath(k4)
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE, s.mode)
if err != nil {
t.Fatalf("Create cache file %s: %s", fpath, err)
}
if _, err = f.Write(buf); err != nil {
_ = f.Close()
t.Fatalf("Write cache file %s: %s", fpath, err)
}
for i := 98304; i < 102400; i++ { // reset 96K ~ 100K
buf[i] = 0
}
if _, err = f.Write(checksum(buf)); err != nil {
_ = f.Close()
t.Fatalf("Write checksum to cache file %s: %s", fpath, err)
}
_ = f.Close()

buf = make([]byte, 1048576)
_, _ = rand.Read(buf)
s.cache(k5, NewPage(buf), true)
time.Sleep(time.Second * 5) // wait for cache file flushed

check := func(key string, off int64, size int) bool {
rc, err := s.load(key)
if err != nil {
t.Fatalf("CacheStore load key %s: %s", key, err)
}
defer rc.Close()
buf := make([]byte, size)
_, err = rc.ReadAt(buf, off)
return err == nil
}
cases := []struct {
key string
off int64
size int
expect bool
}{
{k1, 0, 10, true},
{k1, 3, 5, true},
{k2, 0, 10, true},
{k2, 3, 5, true},
{k3, 0, 102400, true},
{k3, 8192, 92160, true}, // 8K ~ 98K
{k4, 0, 102400, true},
{k4, 8192, 92160, true}, // only CsExtend can detect the error
{k5, 0, 1048576, true},
{k5, 131072, 131072, true},
{k5, 102400, 512000, true},
}
for _, l := range []string{CsNone, CsFull, CsShrink, CsExtend} {
s.checksum = l
if l != CsNone {
cases[6].expect = false
}
if l == CsExtend {
cases[7].expect = false
}
for _, c := range cases {
if check(c.key, c.off, c.size) != c.expect {
t.Fatalf("CacheStore check level %s case %+v", l, c)
}
}
}
}

func TestExpand(t *testing.T) {
rs := expandDir("/not/exists/jfsCache")
if len(rs) != 1 || rs[0] != "/not/exists/jfsCache" {
Expand Down
Loading