Skip to content

Commit a555ae1

Browse files
Chuckie12138y00699588
and
y00699588
authored
feat(storage): add check and truncate full text bf files (openGemini#613)
Signed-off-by: y00699588 <[email protected]> Co-authored-by: y00699588 <[email protected]>
1 parent e46692e commit a555ae1

File tree

4 files changed

+89
-13
lines changed

4 files changed

+89
-13
lines changed

engine/detached_check_metaInfo.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewDetachedMetaInfo() *DetachedMetaInfo {
4949
return &DetachedMetaInfo{}
5050
}
5151

52-
func (d *DetachedMetaInfo) checkAndTruncateDetachedFiles(dir, msName string, bfCols []string) error {
52+
func (d *DetachedMetaInfo) checkAndTruncateDetachedFiles(dir, msName string, bfCols []string, isFullTextIdx bool) error {
5353
filePath := d.getDetachedPrefixFilePath(dir, msName)
5454
//check detached metaIndex file
5555
err := d.checkAndTruncateMetaIndexFile(filePath)
@@ -82,7 +82,7 @@ func (d *DetachedMetaInfo) checkAndTruncateDetachedFiles(dir, msName string, bfC
8282
}
8383

8484
//check bloom filter files
85-
return d.checkAndTruncateBfFiles(dir, msName, bfCols)
85+
return d.checkAndTruncateBfFiles(dir, msName, bfCols, isFullTextIdx)
8686
}
8787

8888
func (d *DetachedMetaInfo) getDetachedPrefixFilePath(dir, msName string) string {
@@ -343,15 +343,19 @@ func (d *DetachedMetaInfo) checkAndTruncatePkIdxFile(filePath string) error {
343343
return nil
344344
}
345345

346-
func (d *DetachedMetaInfo) checkAndTruncateBfFiles(dir, msName string, bfCols []string) error {
346+
func (d *DetachedMetaInfo) checkAndTruncateBfFiles(dir, msName string, bfCols []string, isFullTextIdx bool) error {
347347
localPath := dir
348348
prefixDataPathLength := len(obs.GetPrefixDataPath())
349349
remotePath := localPath[prefixDataPathLength:]
350-
350+
cols := bfCols
351+
if isFullTextIdx {
352+
cols = append(cols, sparseindex.FullTextIndex)
353+
}
351354
//get bloom filter cols -> gen bloom filter files
352355
constant := logstore.GetConstant(logstore.CurrentLogTokenizerVersion)
353-
for i := range bfCols {
354-
fdr, fdl, err := d.getBloomFilterFile(remotePath, localPath, msName, bfCols[i])
356+
357+
for i := range cols {
358+
fdr, fdl, err := d.getBloomFilterFile(remotePath, localPath, msName, cols[i])
355359
if err != nil {
356360
return err
357361
}

engine/detached_check_metaInfo_test.go

+50-6
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestCheckDetachedFiles(t *testing.T) {
9191

9292
require.Equal(t, 4*100, int(sh.rowCount))
9393
detachedInfo := NewDetachedMetaInfo()
94-
err = detachedInfo.checkAndTruncateDetachedFiles(sh.filesPath, defaultMeasurementName, bfColumn)
94+
err = detachedInfo.checkAndTruncateDetachedFiles(sh.filesPath, defaultMeasurementName, bfColumn, false)
9595
if err != nil {
9696
t.Fatal(err)
9797
}
@@ -153,7 +153,7 @@ func TestTestCheckDetachedFilesV2(t *testing.T) {
153153
mutable.SetWriteChunk(msInfo, rec)
154154
sh.commitSnapshot(sh.activeTbl)
155155
detachedInfo := NewDetachedMetaInfo()
156-
err = detachedInfo.checkAndTruncateDetachedFiles(sh.filesPath, defaultMeasurementName, bfColumn)
156+
err = detachedInfo.checkAndTruncateDetachedFiles(sh.filesPath, defaultMeasurementName, bfColumn, false)
157157
if err != nil {
158158
t.Fatal(err)
159159
}
@@ -168,7 +168,7 @@ func TestTestCheckDetachedFilesV2(t *testing.T) {
168168
func TestCheckMetaIndexFile(t *testing.T) {
169169
testDir := t.TempDir()
170170
detachedInfo := NewDetachedMetaInfo()
171-
err := detachedInfo.checkAndTruncateDetachedFiles("unknownPath", "cpu", nil)
171+
err := detachedInfo.checkAndTruncateDetachedFiles("unknownPath", "cpu", nil, false)
172172
if err == nil {
173173
t.Fatal("should return open metaIndex file path error")
174174
}
@@ -349,7 +349,7 @@ func TestCheckBloomFilterFiles(t *testing.T) {
349349
testDir := t.TempDir()
350350
detachedInfo := NewDetachedMetaInfo()
351351
bfCols := []string{"tags"}
352-
err := detachedInfo.checkAndTruncateBfFiles("unknownPath", "", bfCols)
352+
err := detachedInfo.checkAndTruncateBfFiles("unknownPath", "", bfCols, false)
353353
if err == nil {
354354
t.Fatal("should return open bloom filter file path error")
355355
}
@@ -372,7 +372,7 @@ func TestCheckBloomFilterFiles(t *testing.T) {
372372
if err != nil {
373373
t.Fatal(err)
374374
}
375-
err = detachedInfo.checkAndTruncateBfFiles(testDir, "", bfCols)
375+
err = detachedInfo.checkAndTruncateBfFiles(testDir, "", bfCols, false)
376376
if err != nil {
377377
t.Fatal(err)
378378
}
@@ -382,7 +382,51 @@ func TestCheckBloomFilterFiles(t *testing.T) {
382382
if err != nil {
383383
t.Fatal(err)
384384
}
385-
err = detachedInfo.checkAndTruncateBfFiles(testDir, "", bfCols)
385+
err = detachedInfo.checkAndTruncateBfFiles(testDir, "", bfCols, false)
386+
if err != nil {
387+
t.Fatal(err)
388+
}
389+
_ = os.RemoveAll(testDir)
390+
}
391+
392+
func TestCheckFullTextBfFiles(t *testing.T) {
393+
testDir := t.TempDir()
394+
detachedInfo := NewDetachedMetaInfo()
395+
err := detachedInfo.checkAndTruncateBfFiles("unknownPath", "", nil, true)
396+
if err == nil {
397+
t.Fatal("should return open bloom filter file path error")
398+
}
399+
constant := logstore.GetConstant(logstore.CurrentLogTokenizerVersion)
400+
buf := make([]byte, constant.FilterDataDiskSize*2)
401+
lock := fileops.FileLockOption("")
402+
pri := fileops.FilePriorityOption(fileops.IO_PRIORITY_NORMAL)
403+
bfCols := []string{sparseindex.FullTextIndex}
404+
filePath := sparseindex.GetBloomFilterFilePath(testDir, "", bfCols[0])
405+
406+
fd, err := fileops.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0640, lock, pri)
407+
if err != nil {
408+
log.Error("open detached bloom filter file fail", zap.String("name", testDir), zap.Error(err))
409+
t.Fatal(err)
410+
}
411+
defer func() {
412+
_ = fd.Close()
413+
}()
414+
415+
_, err = fd.Write(buf)
416+
if err != nil {
417+
t.Fatal(err)
418+
}
419+
err = detachedInfo.checkAndTruncateBfFiles(testDir, "", bfCols, true)
420+
if err != nil {
421+
t.Fatal(err)
422+
}
423+
424+
detachedInfo.lastPkMetaEndBlockId = 3
425+
_, err = fd.Write(buf)
426+
if err != nil {
427+
t.Fatal(err)
428+
}
429+
err = detachedInfo.checkAndTruncateBfFiles(testDir, "", bfCols, true)
386430
if err != nil {
387431
t.Fatal(err)
388432
}

engine/immutable/compact_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -3824,3 +3824,30 @@ func TestWriteMemoryBloomFilterData(t *testing.T) {
38243824
assert.NoError(t, err)
38253825
assert.Equal(t, n*2, msBuilder.localBFCount)
38263826
}
3827+
3828+
func TestGetLocalBloomFilterData(t *testing.T) {
3829+
testCompDir := t.TempDir()
3830+
_ = fileops.RemoveAll(testCompDir)
3831+
sig := interruptsignal.NewInterruptSignal()
3832+
defer func() {
3833+
sig.Close()
3834+
_ = fileops.RemoveAll(testCompDir)
3835+
}()
3836+
3837+
conf := NewColumnStoreConfig()
3838+
tier := uint64(util.Hot)
3839+
lockPath := ""
3840+
store := NewTableStore(testCompDir, &lockPath, &tier, true, conf)
3841+
defer store.Close()
3842+
3843+
fileName := NewTSSPFileName(store.NextSequence(), 0, 0, 0, true, &lockPath)
3844+
msBuilder := NewMsBuilder(store.path, "mst", &lockPath, conf, 1, fileName, store.Tier(), nil, 2, config.COLUMNSTORE, nil, 0)
3845+
p := filepath.Join(msBuilder.Path, "full_text_bf.idx")
3846+
skipIndexFilePaths := make([]string, 0, 1)
3847+
skipIndexFilePaths = append(skipIndexFilePaths, p)
3848+
msBuilder.localBFCount = 1
3849+
_, err := msBuilder.getLocalBloomFilterData(skipIndexFilePaths)
3850+
if err == nil {
3851+
t.Fatal("should return no such file or dir")
3852+
}
3853+
}

engine/partition.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/openGemini/openGemini/lib/index"
3838
"github.com/openGemini/openGemini/lib/interruptsignal"
3939
"github.com/openGemini/openGemini/lib/logger"
40+
"github.com/openGemini/openGemini/lib/logstore"
4041
"github.com/openGemini/openGemini/lib/metaclient"
4142
"github.com/openGemini/openGemini/lib/netstorage"
4243
"github.com/openGemini/openGemini/lib/statisticsPusher/statistics"
@@ -664,7 +665,7 @@ func (dbPT *DBPTInfo) loadProcess(opId uint64, thermalShards map[uint64]struct{}
664665

665666
func checkAndTruncateDetachedFiles(d *DetachedMetaInfo, mstInfo *meta.MeasurementInfo, sh *shard) error {
666667
d.obsOpt = mstInfo.ObsOptions
667-
err := d.checkAndTruncateDetachedFiles(sh.filesPath, mstInfo.Name, mstInfo.IndexRelation.GetBloomFilterColumns())
668+
err := d.checkAndTruncateDetachedFiles(sh.filesPath, mstInfo.Name, mstInfo.IndexRelation.GetBloomFilterColumns(), logstore.IsFullTextIdx(&mstInfo.IndexRelation))
668669
if err != nil {
669670
return err
670671
}

0 commit comments

Comments
 (0)