From 635527784ae72d1ceaa3fe393a283fab614f156f Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Thu, 30 May 2024 16:14:03 +0800 Subject: [PATCH] meta: fix protection check for compaction (#4901) --- pkg/meta/base_test.go | 20 ++++++++++++++++++++ pkg/meta/redis.go | 32 +++++++++++++++++--------------- pkg/meta/sql.go | 30 ++++++++++++++++-------------- pkg/meta/tkv.go | 30 ++++++++++++++++-------------- 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 6df45774ef01..193e1b825f0e 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -1304,6 +1304,26 @@ func testCompaction(t *testing.T, m Meta, trash bool) { if len(slices) != 1 || slices[0].Len != 3<<20 { t.Fatalf("inode %d should be compacted, but have %d slices, size %d", inode, len(slices), slices[0].Len) } + + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 2, 0, Slice{Id: sliceId, Size: 2338508, Len: 2338508}, time.Now()) + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 2, 8829056, Slice{Id: sliceId, Size: 1074933, Len: 1074933}, time.Now()) + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 2, 7663608, Slice{Id: sliceId, Size: 41480, Len: 4148}, time.Now()) + _ = m.Fallocate(ctx, inode, fallocZeroRange, 2*ChunkSize+4515328, 3152428, nil) + _ = m.Fallocate(ctx, inode, fallocZeroRange, 2*ChunkSize+4515328, 2607724, nil) + if c, ok := m.(compactor); ok { + c.compactChunk(inode, 2, true) + } + if st := m.Read(ctx, inode, 2, &slices); st != 0 { + t.Fatalf("read 1: %s", st) + } + // compact twice: 4515328+2607724-2338508 = 4784544; 8829056+1074933-2338508-4784544=2780937 + if len(slices) != 3 || slices[0].Len != 2338508 || slices[1].Len != 4784544 || slices[2].Len != 2780937 { + t.Fatalf("inode %d should be compacted, but have %d slices, size %d,%d,%d", + inode, len(slices), slices[0].Len, slices[1].Len, slices[2].Len) + } } func testConcurrentWrite(t *testing.T, m Meta) { diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 2c4be7f96afb..8462ba820701 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2918,17 +2918,19 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { return } skipped := skipSome(ss) - var first, last *slice - if skipped > 0 { - first, last = ss[0], ss[skipped-1] - } - ss = ss[skipped:] - pos, size, slices := compactChunk(ss) - if len(ss) < 2 || size == 0 { + compacted := ss[skipped:] + pos, size, slices := compactChunk(compacted) + if len(compacted) < 2 || size == 0 { return } - if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos { - panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size)) + for _, s := range ss[:skipped] { + if pos+size > s.pos && s.pos+s.len > pos { + var sstring string + for _, s := range ss { + sstring += fmt.Sprintf("\n%+v", *s) + } + panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) + } } var id uint64 @@ -2936,11 +2938,11 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { if st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size) + logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) err = m.newMsg(CompactChunk, slices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { - logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err) + logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) } return } @@ -2948,13 +2950,13 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { var rs []*redis.IntCmd // trash disabled: check reference of slices trash := m.toTrash(0) if trash { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...) } } } else { - rs = make([]*redis.IntCmd, len(ss)) + rs = make([]*redis.IntCmd, len(compacted)) } key := m.chunkKey(inode, indx) errno := errno(m.txn(ctx, func(tx *redis.Tx) error { @@ -2983,7 +2985,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { pipe.HSet(ctx, m.delSlices(), fmt.Sprintf("%d_%d", id, time.Now().Unix()), buf) } } else { - for i, s := range ss { + for i, s := range compacted { if s.id > 0 { rs[i] = pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.id, s.size), -1) } @@ -3010,7 +3012,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { m.of.InvalidateChunk(inode, indx) m.cleanupZeroRef(m.sliceKey(id, size)) if !trash { - for i, s := range ss { + for i, s := range compacted { if s.id > 0 && rs[i].Err() == nil && rs[i].Val() < 0 { m.deleteSlice(s.id, s.size) } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 6ff836bcfc23..0ece3e18aadd 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2774,17 +2774,19 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { return } skipped := skipSome(ss) - var first, last *slice - if skipped > 0 { - first, last = ss[0], ss[skipped-1] - } - ss = ss[skipped:] - pos, size, slices := compactChunk(ss) - if len(ss) < 2 || size == 0 { + compacted := ss[skipped:] + pos, size, slices := compactChunk(compacted) + if len(compacted) < 2 || size == 0 { return } - if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos { - panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size)) + for _, s := range ss[:skipped] { + if pos+size > s.pos && s.pos+s.len > pos { + var sstring string + for _, s := range ss { + sstring += fmt.Sprintf("\n%+v", *s) + } + panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) + } } var id uint64 @@ -2792,18 +2794,18 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { if st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size) + logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) err = m.newMsg(CompactChunk, slices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { - logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err) + logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) } return } var buf []byte trash := m.toTrash(0) if trash { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...) } @@ -2835,7 +2837,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } } } else { - for _, s_ := range ss { + for _, s_ := range compacted { if s_.id == 0 { continue } @@ -2871,7 +2873,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if err == nil { m.of.InvalidateChunk(inode, indx) if !trash { - for _, s := range ss { + for _, s := range compacted { if s.id == 0 { continue } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 78c89e51aba9..44acc352bae3 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2399,17 +2399,19 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { return } skipped := skipSome(ss) - var first, last *slice - if skipped > 0 { - first, last = ss[0], ss[skipped-1] - } - ss = ss[skipped:] - pos, size, slices := compactChunk(ss) - if len(ss) < 2 || size == 0 { + compacted := ss[skipped:] + pos, size, slices := compactChunk(compacted) + if len(compacted) < 2 || size == 0 { return } - if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos { - panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size)) + for _, s := range ss[:skipped] { + if pos+size > s.pos && s.pos+s.len > pos { + var sstring string + for _, s := range ss { + sstring += fmt.Sprintf("\n%+v", *s) + } + panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) + } } var id uint64 @@ -2417,18 +2419,18 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { if st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size) + logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) err = m.newMsg(CompactChunk, slices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { - logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err) + logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) } return } var dsbuf []byte trash := m.toTrash(0) if trash { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { dsbuf = append(dsbuf, m.encodeDelayedSlice(s.id, s.size)...) } @@ -2450,7 +2452,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { tx.set(m.delSliceKey(time.Now().Unix(), id), dsbuf) } } else { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { tx.incrBy(m.sliceKey(s.id, s.size), -1) } @@ -2480,7 +2482,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { m.cleanupZeroRef(id, size) if !trash { var refs int64 - for _, s := range ss { + for _, s := range compacted { if s.id > 0 && m.client.txn(func(tx *kvTxn) error { refs = tx.incrBy(m.sliceKey(s.id, s.size), 0) return nil