Skip to content

Commit

Permalink
recovering file indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadVatandoost committed Sep 2, 2021
1 parent b816a9e commit cb1c3db
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 14 deletions.
39 changes: 39 additions & 0 deletions crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,45 @@ func (fse *FSEngine) OpenVirtualFile(id uint32) (*virtualFile.VirtualFile, error
return vf, nil
}

func (fse *FSEngine) OpenVirtualFileForRecovery(id uint32) (*virtualFile.VirtualFile, error) {
fse.crudMutex.Lock()
defer fse.crudMutex.Unlock()
vfInfo, ok := fse.openFiles[id]
if ok {
fileInfo, err := fse.header.GetFileData(id)
if err != nil {
return nil, err
}

vf := virtualFile.OpenVirtualFileForRecovery(&fileInfo, fse.blockSize-constants.BlockHeaderSize, fse, vfInfo.blm,
int(fse.blockSize-constants.BlockHeaderSize)*constants.VirtualFileBufferBlockNumber, fse.log)
vfInfo.numberOfOpened = vfInfo.numberOfOpened + 1
return vf, nil
}

fileInfo, err := fse.header.GetFileData(id)
if err != nil {
return nil, err
}
if len(fileInfo.GetRMapBlocks()) == 0 {
log.Warnv("can not open virtual file, roaring byte array length is zero", "id", id)
return nil, fmt.Errorf("virtual file is empty, id: %v", id)
}
blm, err := blockAllocationMap.Open(fse.log, fse, fse.maxNumberOfBlocks, fileInfo.GetLastBlock(),
fileInfo.GetRMapBlocks())
if err != nil {
return nil, err
}
vf := virtualFile.OpenVirtualFileForRecovery(&fileInfo, fse.blockSize-constants.BlockHeaderSize, fse, blm,
int(fse.blockSize-constants.BlockHeaderSize)*constants.VirtualFileBufferBlockNumber, fse.log)

vfInfo = &VFInfo{id: id, blm: blm, numberOfOpened: 1}
vfInfo.vfs = append(vfInfo.vfs, vf)
fse.openFiles[id] = vfInfo

return vf, nil
}

func (fse *FSEngine) RemoveVirtualFile(id uint32) (int, error) {
fse.crudMutex.Lock()
defer fse.crudMutex.Unlock()
Expand Down
12 changes: 8 additions & 4 deletions pkg/Header/fileIndex.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func (hfs *HFileSystem) updateFileIndex(index uint32) error {
}
hfs.fileIndexSize = uint32(len(fi))

if hfs.fileIndexSize == 0 {
hfs.log.Warn("file indexes size is zero")
}
//if hfs.fileIndexSize == 0 {
// hfs.log.Warn("file indexes size is zero")
//}

if hfs.storeInRedis {
err := hfs.setRedisKeyValue("arch"+fmt.Sprint(hfs.id)+"_fileIndex"+fmt.Sprint(int(index)%len(hfs.fileIndexes)), fi)
Expand Down Expand Up @@ -183,7 +183,11 @@ func (hfs *HFileSystem) UpdateFileOptionalData(fileID uint32, info []byte) error
}

func (hfs *HFileSystem) GetFilesList() []*fileIndex.File {
return hfs.fileIndexes[0].GetFilesList()
filesIndex := make([]*fileIndex.File, 0)
for _, fIndex := range hfs.fileIndexes {
filesIndex = append(filesIndex, fIndex.GetFilesList()...)
}
return filesIndex
}

//func (hfs *HFileSystem) GetFileOptionalData(fileId uint32) ([]byte, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/virtualFile/IO.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,7 @@ func (v *VirtualFile) UpdateFileOptionalData(info []byte) error {
}
return v.fs.UpdateFileIndexes(v.id, v.firstBlockIndex, v.lastBlock, v.fileSize, data, v.optionalData)
}

func (v *VirtualFile) AddFileSize(size uint32) {
v.fileSize = v.fileSize + size
}
27 changes: 27 additions & 0 deletions pkg/virtualFile/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,33 @@ func OpenVirtualFile(fileInfo *fileIndex.File, blockSize uint32,
}
}

func OpenVirtualFileForRecovery(fileInfo *fileIndex.File, blockSize uint32,
fs FS, blm *blockAllocationMap.BlockAllocationMap, bufferSize int, log *log.Logger) *VirtualFile {
return &VirtualFile{
bufRX: make([]byte, 0),
bufTX: make([]byte, 0),
optionalData: fileInfo.Optional,
name: fileInfo.GetName(),
id: fileInfo.GetId(),
Closed: false,
readOnly: false,
// numberOfBlocks: numberOfBlocks,
blockAllocationMap: blm,
allocatedBlock: make([]uint32, 0),
blockSize: blockSize,
//size: uint64(numberOfBlocks * blockSize),
fs: fs,
log: log,
seekPointer: 0,
bufStart: 0,
bufEnd: 0,
nextBlockIndex: 0,
firstBlockIndex: fileInfo.FirstBlock,
bufferSize: bufferSize,
fileSize: fileInfo.GetFileSize(),
}
}

func NewVirtualFile(fileName string, fileID uint32, blockSize uint32, fs FS,
blm *blockAllocationMap.BlockAllocationMap, bufferSize int, log *log.Logger) *VirtualFile {
return &VirtualFile{
Expand Down
4 changes: 4 additions & 0 deletions pkg/virtualFile/virtualFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ func (v *VirtualFile) GetFileSize() uint32 {
func (v *VirtualFile) GetOptionalData() []byte {
return v.optionalData
}

func (v *VirtualFile) GetBLMArray() []uint32 {
return v.blockAllocationMap.ToArray()
}
26 changes: 16 additions & 10 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func RecoverHeaderFileSystem(id uint32, path string, blockSize uint32, eventsHan
continue
}
if n != int(fse.blockSize) {
fse.log.Warnv("can not read block completely", "n", n, "blockSize", fse.blockSize)
continue
}

Expand All @@ -185,29 +184,29 @@ func RecoverHeaderFileSystem(id uint32, path string, blockSize uint32, eventsHan
continue
}
if pBlockID != blockID {
fse.log.Warnv("blockd id is wrong,", "pBlockID", pBlockID, "blockID", blockID)
// fse.log.Warnv("blockd id is wrong,", "pBlockID", pBlockID, "blockID", blockID)
continue
}
if !hfs.CheckIDExist(pFileID) {
fse.log.Infov("find new virtual file", "pFileID", pFileID)
vf, err := fse.NewVirtualFile(pFileID, "test")

vf, isExist := vfs[pFileID]
if !isExist {
vf, err = fse.NewVirtualFile(pFileID, "test")
if err != nil {
fse.log.Warnv("can not add virtual file correctly,", "pFileID", pFileID, "err", err.Error())
continue
}
vfs[pFileID] = vf
}
vf, isExist := vfs[pFileID]
if !isExist {
fse.log.Warnv("virtual file does not added correctly", "pFileID", pFileID)
continue
}

err = vf.AddBlockID(blockID)
if err != nil {
fse.log.Warnv("can not add blockID to vf,", "pFileID", pFileID, "blockID", blockID,
"err", err.Error())
continue
}

vf.AddFileSize(dataSize)

err = fse.header.SetBlockAsAllocated(blockID)
if err != nil {
fse.log.Errorv("can not set block id in header",
Expand All @@ -216,14 +215,21 @@ func RecoverHeaderFileSystem(id uint32, path string, blockSize uint32, eventsHan
}
}

vfBlockCounter := uint32(0)
for i, vf := range vfs {
vfBlockCounter = vfBlockCounter + uint32(len(vf.GetBLMArray()))
err := vf.Close()
if err != nil {
fse.log.Errorv("can not close virtual file",
"i", i, "fileID", vf.GetFileID())
}
}

fileIndexes := fse.header.GetFilesList()
fse.log.Infov("recovery report",
"len(fileIndexes)", len(fileIndexes), "number of blocks", len(fse.header.GetBLMArray()),
"sum of vfsBlocks", vfBlockCounter)

err = hfs.UpdateFSHeader()
if err != nil {
log.Errorv("can not update header", "err", err.Error())
Expand Down

0 comments on commit cb1c3db

Please sign in to comment.