Skip to content

Commit

Permalink
add ChangeSeekPointer function
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadVatandoost committed Jul 20, 2021
1 parent 2eb9181 commit e8cb9df
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 86 deletions.
13 changes: 7 additions & 6 deletions IO.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ import (
)

func (fse *FSEngine) writeInBlock(data []byte, blockIndex uint32) (n int, err error) {
// fse.log.Infov("write in block", "blockIndex", blockIndex,
// fse.log.Infov("FSEngine write in block", "blockIndex", blockIndex,
// "maxNumberOfBlocks", fse.maxNumberOfBlocks, "len(data)", len(data))
if blockIndex >= fse.maxNumberOfBlocks {
return 0, ErrBlockIndexOutOFRange
}

n, err = fse.file.WriteAt(data, int64(blockIndex)*int64(fse.blockSize))
if err != nil {
fse.log.Infov("Error Writing to file", "err", err.Error(), "file", fse.file.Name())
fse.log.Infov("Error Writing to file", "err", err.Error(),
"file", fse.file.Name(), "blockIndex", blockIndex)
}

return
}

func (fse *FSEngine) ReadBlock(blockIndex uint32) ([]byte, error) {
// fse.log.Infov("read in block", "blockIndex", blockIndex)
// fse.log.Infov("FSEngine read in block", "blockIndex", blockIndex)
if blockIndex >= fse.maxNumberOfBlocks {
return nil, ErrBlockIndexOutOFRange
}
Expand Down Expand Up @@ -75,7 +76,7 @@ func (fse *FSEngine) Write(data []byte, fileID uint32) (int, error) {
if dataSize == 0 {
return 0, fmt.Errorf("data siz is zero, file ID: %v ", fileID)
}
vf, ok := fse.openFiles[fileID]
vfs, ok := fse.openFiles[fileID]
if !ok {
return 0, fmt.Errorf("this file ID: %v did not opened", fileID)
}
Expand All @@ -85,7 +86,7 @@ func (fse *FSEngine) Write(data []byte, fileID uint32) (int, error) {
if n >= dataSize {
return n, nil
}
previousBlock := vf.GetLastBlock()
previousBlock := vfs[0].GetLastBlock()
blockID := fse.header.FindNextFreeBlockAndAllocate()
var d []byte
if dataSize >= n+int(fse.blockSizeUsable) {
Expand All @@ -105,7 +106,7 @@ func (fse *FSEngine) Write(data []byte, fileID uint32) (int, error) {
return 0, err
}

err = vf.AddBlockID(blockID)
err = vfs[0].AddBlockID(blockID)
if err != nil {
return 0, err
}
Expand Down
78 changes: 78 additions & 0 deletions IO_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,81 @@ func TestIO_MultipleVirtualFile(t *testing.T) {
_ = utils.DeleteFile(homePath + "/" + fsPath)
_ = utils.DeleteFile(homePath + "/" + headerPath)
}

func TestIO_ChangeSeekPointer(t *testing.T) {
homePath, err := os.UserHomeDir()
assert.Equal(t, nil, err)
_ = utils.DeleteFile(homePath + "/" + fsPath)
_ = utils.DeleteFile(homePath + "/" + headerPath)
eventListener := EventsListener{t: t}
fse, err := CreateFileSystem(homePath, fileSizeTest, blockSizeTest, &eventListener, log.GetScope("test"))
assert.Equal(t, nil, err)
assert.Equal(t, true, utils.FileExists(homePath+"/"+fsPath))
assert.Equal(t, true, utils.FileExists(homePath+"/"+headerPath))
var bytes []byte

MaxID := 1000
MaxByteArraySize := int(blockSizeTest * 0.5)
VFSize := int(3.5 * blockSizeTest)
vfID := uint32(rand.Intn(MaxID))
vf, err := fse.NewVirtualFile(vfID, "test")
assert.Equal(t, nil, err)
size := 0

for {
token := make([]byte, uint32(rand.Intn(MaxByteArraySize)))
m, err := rand.Read(token)
assert.Equal(t, nil, err)
bytes = append(bytes, token...)
size = size + m
n, err := vf.Write(token)
assert.Equal(t, nil, err)
assert.Equal(t, m, n)

if size > VFSize {
break
}
}

err = vf.Close()
assert.Equal(t, nil, err)

vf2, err := fse.OpenVirtualFile(vfID)
assert.Equal(t, nil, err)
segmentSize := 10
readedSize := 0
for {
token := make([]byte, segmentSize)
n, err := vf2.Read(token)

assert.Equal(t, false, err != nil && n == 0)
assert.Equal(t, token[:n], bytes[readedSize:readedSize+n])
readedSize = readedSize + n
if readedSize == size {
break
}
}

testCounter := 0

for {
token := make([]byte, segmentSize)
seekTest := uint32(rand.Intn(size - segmentSize))
err := vf2.ChangeSeekPointer(int64(seekTest))
assert.Equal(t, nil, err)
n, err := vf2.Read(token)
assert.Equal(t, nil, err)
assert.Equal(t, n, segmentSize)
assert.Equal(t, int(seekTest)+n, vf2.GetSeek())
assert.Equal(t, token, bytes[seekTest:int(seekTest)+n])
testCounter++
if testCounter == 5 {
break
}
}

err = fse.Close()
assert.Equal(t, nil, err)
_ = utils.DeleteFile(homePath + "/" + fsPath)
_ = utils.DeleteFile(homePath + "/" + headerPath)
}
51 changes: 10 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,31 @@ First you need to establish storage location on local system and initialize stor

```go
package main

import (
"github.com/fanap-infra/fsEngine"
"github.com/fanap-infra/log"
)

type EventsTrigger struct {
fileID uint32
}
func main() {
// define storage location and size
const storagePath = "/var/fsEngine/volume1"
const storageSize = 1 << 32 // 4GB volume

fileSystem, err := fsEngine.CreateFileSystem(storagePath, storageSize,
fsEngine.BLOCKSIZE, log.GetScope("Example"))
fsEngine.BLOCKSIZE, &EventsTrigger{}, log.GetScope("Example"))
if err != nil {
log.Fatal(err)
}

return
}


// VirtualFileDeleted
// Implement event listener.
func (e EventsTrigger) VirtualFileDeleted(fileID uint32, message string) {
log.Warnf("File with ID:%v, deleted: %s", fileID, message)
return
}
```

This is the barebone definition for using FSEngine, however, to store objects you have to create a virtual identity for the object which here is called a `virtualFile`.
Expand All @@ -74,41 +78,6 @@ This is the barebone definition for using FSEngine, however, to store objects yo
log.Fatal(err)
}
```
## Reading Object

To read object from storage you first need to open the archiver which is internally a `Singleton` object and once initialized, returns the main object. For retrieving an object, provide `object-ID` and use the code bellow.
```go
package main

import (
"github.com/fanap-infra/fsEngine"
"github.com/fanap-infra/log"
)

func main() {
// define storage location and size
const storagePath = "/var/fsEngine/volume1"
const fileID = 68 // Some random fileID

fileSystem, err := fsEngine.ParseFileSystem(storagePath, log.GetScope("Example"))
if err != nil {
log.Fatal(err)
}

virtualFile, err := fileSystem.OpenVirtualFile(fileID)
if err != nil {
log.Fatal(err)
}
data := make([]byte, virtualFile.Size())
_, err = virtualFile.Read(data, fileID)
if err != nil {
log.Fatal(err)
}

return
}
```
An object is stored and read sequentially, but it can be read from a certain point by `ReadAt` function.

[comment]: <> (TODO: Complete readFile section)
[comment]: <> (## Reading an object from storage)
Expand Down
2 changes: 1 addition & 1 deletion constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import "github.com/fanap-infra/fsEngine/pkg/err"
//

const (
fsPath = "fsTest.beh"
fsPath = "fs.beh"
headerPath = "Header.Beh"
FileSystemIdentifier = "BehFS;P "
FileSystemVersion = 1
Expand Down
13 changes: 7 additions & 6 deletions crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ func (fse *FSEngine) NewVirtualFile(id uint32, fileName string) (*virtualFile.Vi
if err != nil {
return nil, err
}
fse.openFiles[id] = vf
fse.openFiles[id] = append(fse.openFiles[id], vf)
return vf, nil
}

func (fse *FSEngine) OpenVirtualFile(id uint32) (*virtualFile.VirtualFile, error) {
fse.crudMutex.Lock()
defer fse.crudMutex.Unlock()
_, ok := fse.openFiles[id]
if ok {
return nil, fmt.Errorf("this ID: %v is opened before", id)
}
//_, ok := fse.openFiles[id]
//if ok {
// return nil, fmt.Errorf("this ID: %v is opened before", id)
//}
fileInfo, err := fse.header.GetFileData(id)
if err != nil {
return nil, err
Expand All @@ -44,7 +44,8 @@ func (fse *FSEngine) OpenVirtualFile(id uint32) (*virtualFile.VirtualFile, error
}
vf := virtualFile.OpenVirtualFile(&fileInfo, fse.blockSize-BlockHeaderSize, fse, blm,
int(fse.blockSize-BlockHeaderSize)*VirtualFileBufferBlockNumber, fse.log)
fse.openFiles[id] = vf
// fse.openFiles[id] = vf
fse.openFiles[id] = append(fse.openFiles[id], vf)
//err = fse.header.AddVirtualFile(id, fileInfo.GetName())
//if err != nil {
// return nil, err
Expand Down
18 changes: 12 additions & 6 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type FSEngine struct {
blockSizeUsable uint32
// lastWrittenBlock uint32 // the last block that has been written into
// blockAllocationMap *blockAllocationMap.BlockAllocationMap // BAM data in memory coded with roaring, to be synced later on to Disk.
openFiles map[uint32]*virtualFile.VirtualFile
openFiles map[uint32][]*virtualFile.VirtualFile
// fileIndex fileIndex.FileIndex
WMux sync.Mutex
RMux sync.Mutex
Expand All @@ -45,11 +45,13 @@ type FSEngine struct {

// Close ...
func (fse *FSEngine) Close() error {
for _, vf := range fse.openFiles {
err := vf.Close()
if err != nil {
fse.log.Warnv("Can not close virtual file", "err", err.Error())
return err
for _, vfs := range fse.openFiles {
for _, vf := range vfs {
err := vf.Close()
if err != nil {
fse.log.Warnv("Can not close virtual file", "err", err.Error())
return err
}
}
}

Expand All @@ -70,3 +72,7 @@ func (fse *FSEngine) Close() error {
func (fse *FSEngine) GetFilePath() string {
return fse.file.Name()
}

func (fse *FSEngine) GetBlockSize() uint32 {
return fse.header.GetBlockSize()
}
15 changes: 12 additions & 3 deletions internal/Header/fileIndex.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ func (hfs *HFileSystem) updateFileIndex() error {
if err != nil {
return err
}

hfs.fileIndexSize = uint32(len(fi))
checkSum := crc32.ChecksumIEEE(fi)
if hfs.fiChecksum == crc32.ChecksumIEEE(fi) {
if hfs.fiChecksum == checkSum {
return nil
}
hfs.fiChecksum = checkSum
hfs.fileIndexSize = uint32(len(fi))

if hfs.fileIndexSize == 0 {
hfs.log.Warn("file indexes size is zero")
// return fmt.Errorf("fileIndex size %v is Zero",
// hfs.fileIndexSize)
}
if hfs.fileIndexSize > FileIndexMaxByteSize {
return fmt.Errorf("fileIndex size %v is too large, Max valid size: %v",
hfs.fileIndexSize, FileIndexMaxByteSize)
Expand All @@ -65,6 +70,10 @@ func (hfs *HFileSystem) updateFileIndex() error {
func (hfs *HFileSystem) parseFileIndex() error {
buf := make([]byte, hfs.fileIndexSize)

if hfs.fileIndexSize == 0 {
hfs.log.Warnv("file index size is zero", "fileIndexSize", hfs.fileIndexSize)
return nil
}
n, err := hfs.file.ReadAt(buf, FileIndexByteIndex)
if err != nil {
return err
Expand Down
10 changes: 6 additions & 4 deletions internal/Header/fileIndex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ import (
func TestFileIndex(t *testing.T) {
homePath, err := os.UserHomeDir()
assert.Equal(t, err, nil)
_ = utils.DeleteFile(homePath + path)
_ = utils.DeleteFile(homePath + "/" + fsPath)
_ = utils.DeleteFile(homePath + "/" + headerPath)
eHandler := &EventsHandlerTest{}
fs, err := CreateHeaderFS(homePath+path, BLOCKSIZE*1000, BLOCKSIZE, log.GetScope("test"), eHandler)
fs, err := CreateHeaderFS(homePath+"/"+headerPath, BLOCKSIZE*1000, BLOCKSIZE, log.GetScope("test"), eHandler)
assert.Equal(t, err, nil)

err = fs.Close()
assert.Equal(t, err, nil)

fs2, err := ParseHeaderFS(homePath+path, log.GetScope("test2"), eHandler)
fs2, err := ParseHeaderFS(homePath+"/"+headerPath, log.GetScope("test2"), eHandler)
if !assert.Equal(t, err, nil) {
return
}

assert.Equal(t, fs2.blmSize, fs.blmSize)

_ = utils.DeleteFile(homePath + path)
_ = utils.DeleteFile(homePath + "/" + fsPath)
_ = utils.DeleteFile(homePath + "/" + headerPath)
}
1 change: 1 addition & 0 deletions internal/Header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,6 @@ func (hfs *HFileSystem) parseHeader() error {
hfs.fileIndexSize = binary.BigEndian.Uint32(buf[24:28])
hfs.blmSize = binary.BigEndian.Uint32(buf[28:32])

hfs.size = int64(hfs.maxNumberOfBlocks * hfs.blockSize)
return nil
}
Loading

0 comments on commit e8cb9df

Please sign in to comment.