Skip to content

Commit

Permalink
some changes
Browse files Browse the repository at this point in the history
  • Loading branch information
saeid-a committed Jun 29, 2021
1 parent 3a0157f commit 81c0f3f
Show file tree
Hide file tree
Showing 17 changed files with 290 additions and 77 deletions.
27 changes: 18 additions & 9 deletions IO.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
)

func (fse *FSEngine) writeInBlock(data []byte, blockIndex uint32) (n int, err error) {
// fse.log.Infov("write in block", "blockIndex", blockIndex,
// "maxNumberOfBlocks", fse.maxNumberOfBlocks, "len(data)", len(data))
if blockIndex >= fse.maxNumberOfBlocks {
return 0, ErrBlockIndexOutOFRange
}
Expand All @@ -18,6 +20,7 @@ func (fse *FSEngine) writeInBlock(data []byte, blockIndex uint32) (n int, err er
}

func (fse *FSEngine) ReadBlock(blockIndex uint32) ([]byte, error) {
// fse.log.Infov("read in block", "blockIndex", blockIndex)
if blockIndex >= fse.maxNumberOfBlocks {
return nil, ErrBlockIndexOutOFRange
}
Expand All @@ -32,6 +35,9 @@ func (fse *FSEngine) ReadBlock(blockIndex uint32) ([]byte, error) {
return buf, ErrDataBlockMismatch
}
data, err := fse.parseBlock(buf)
if err != nil {
return nil, err
}
return data, nil
}

Expand Down Expand Up @@ -65,25 +71,30 @@ func (fse *FSEngine) WriteAt(b []byte, off int64, fileID uint32) (n int, err err
func (fse *FSEngine) Write(data []byte, fileID uint32) (int, error) {
fse.rIBlockMux.Lock()
defer fse.rIBlockMux.Unlock()
dataSize := len(data)
if dataSize == 0 {
return 0, fmt.Errorf("data siz is zero, file ID: %v ", fileID)
}
vf, ok := fse.openFiles[fileID]
if !ok {
return 0, fmt.Errorf("this file ID: %v did not opened", fileID)
}
n := 0
var err error
blocksNum := uint32(len(data) / BLOCKSIZEUSABLE)
for i := uint32(0); i < blocksNum; i++ {
for {
if n >= dataSize {
return n, nil
}
previousBlock := vf.GetLastBlock()
// blockID := fse.blockAllocationMap.FindNextFreeBlockAndAllocate()
blockID := fse.header.FindNextFreeBlockAndAllocate()
var d []byte
if i == (blocksNum - 1) {
d, err = fse.prepareBlock(data[i*BLOCKSIZEUSABLE:(i+1)*BLOCKSIZEUSABLE], fileID, previousBlock, blockID)
if dataSize >= n+int(fse.blockSizeUsable) {
d, err = fse.prepareBlock(data[n:n+int(fse.blockSizeUsable)], fileID, previousBlock, blockID)
if err != nil {
return 0, err
}
} else {
d, err = fse.prepareBlock(data[i*BLOCKSIZEUSABLE:], fileID, previousBlock, blockID)
d, err = fse.prepareBlock(data[n:], fileID, previousBlock, blockID)
if err != nil {
return 0, err
}
Expand All @@ -107,10 +118,8 @@ func (fse *FSEngine) Write(data []byte, fileID uint32) (int, error) {
if m != len(d) {
return 0, fmt.Errorf("block with size: %v did not write correctly, n = %v", m, len(d))
}
n = m + n
n = m - BlockHeaderSize + n
}

return n, nil
}

// It is event handler
Expand Down
167 changes: 164 additions & 3 deletions IO_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package fsEngine
import (
"math/rand"
"os"
"strconv"
"testing"

"github.com/fanap-infra/FSEngine/internal/virtualFile"

"github.com/fanap-infra/FSEngine/pkg/utils"
"github.com/fanap-infra/log"
"github.com/stretchr/testify/assert"
Expand All @@ -23,7 +26,7 @@ func TestIO_OneVirtualFile(t *testing.T) {

MaxID := 1000
MaxByteArraySize := int(blockSizeTest * 0.5)
VFSize := int(1.5 * blockSizeTest)
VFSize := int(3.5 * blockSizeTest)
vfID := uint32(rand.Intn(MaxID))
vf, err := fse.NewVirtualFile(vfID, "test")
assert.Equal(t, nil, err)
Expand All @@ -47,18 +50,19 @@ func TestIO_OneVirtualFile(t *testing.T) {
err = vf.Close()
assert.Equal(t, nil, err)

vf, err = fse.OpenVirtualFile(vfID)
vf2, err := fse.OpenVirtualFile(vfID)
assert.Equal(t, nil, err)

for _, v := range bytes {
buf := make([]byte, len(v))
_, err := vf.Read(buf)
m, err := vf2.Read(buf)

assert.Equal(t, nil, err)
if err != nil {
break
}

assert.Equal(t, len(v), m)
assert.Equal(t, v, buf)
}

Expand All @@ -67,3 +71,160 @@ func TestIO_OneVirtualFile(t *testing.T) {
_ = utils.DeleteFile(homePath + fsPathTest)
_ = utils.DeleteFile(homePath + headerPathTest)
}

func TestIO_MultipleVirtualFileConsecutively(t *testing.T) {
homePath, err := os.UserHomeDir()
assert.Equal(t, nil, err)
_ = utils.DeleteFile(homePath + fsPathTest)
_ = utils.DeleteFile(homePath + headerPathTest)
fse, err := CreateFileSystem(homePath+fsPathTest, fileSizeTest, blockSizeTest, log.GetScope("test"))
assert.Equal(t, nil, err)
assert.Equal(t, true, utils.FileExists(homePath+fsPathTest))
assert.Equal(t, true, utils.FileExists(homePath+headerPathTest))

MaxID := 1000
MaxByteArraySize := int(blockSizeTest * 0.5)
VFSize := int(3.5 * blockSizeTest)

virtualFiles := make([]*virtualFile.VirtualFile, 0)
numberOfVFs := 5
bytes := make([][][]byte, numberOfVFs)
vfIDs := make([]uint32, 0)
for i := 0; i < numberOfVFs; i++ {
vfID := uint32(rand.Intn(MaxID))
if utils.ItemExists(vfIDs, vfID) {
i = i - 1
continue
}
vfIDs = append(vfIDs, vfID)
vf, err := fse.NewVirtualFile(vfID, "test"+strconv.Itoa(i))
if assert.Equal(t, nil, err) {
virtualFiles = append(virtualFiles, vf)
}
}
if len(virtualFiles) != numberOfVFs {
return
}

for j, vf := range virtualFiles {
size := 0
for {
token := make([]byte, uint32(rand.Intn(MaxByteArraySize)))
m, err := rand.Read(token)
assert.Equal(t, nil, err)
bytes[j] = append(bytes[j], token)
size = size + m
n, err := vf.Write(token)
assert.Equal(t, nil, err)
assert.Equal(t, m, n)

if size > VFSize {
break
}
}
err = vf.Close()
assert.Equal(t, nil, err)
}

for i, vBytes := range bytes {
vf2, err := fse.OpenVirtualFile(vfIDs[i])
assert.Equal(t, nil, err)

for _, v := range vBytes {
buf := make([]byte, len(v))
m, err := vf2.Read(buf)

assert.Equal(t, nil, err)
if err != nil {
break
}

assert.Equal(t, len(v), m)
assert.Equal(t, v, buf)
}
}

err = fse.Close()
assert.Equal(t, nil, err)
_ = utils.DeleteFile(homePath + fsPathTest)
_ = utils.DeleteFile(homePath + headerPathTest)
}

// ToDo: make it to write virtual file without any sort
func TestIO_MultipleVirtualFile(t *testing.T) {
homePath, err := os.UserHomeDir()
assert.Equal(t, nil, err)
_ = utils.DeleteFile(homePath + fsPathTest)
_ = utils.DeleteFile(homePath + headerPathTest)
fse, err := CreateFileSystem(homePath+fsPathTest, fileSizeTest, blockSizeTest, log.GetScope("test"))
assert.Equal(t, nil, err)
assert.Equal(t, true, utils.FileExists(homePath+fsPathTest))
assert.Equal(t, true, utils.FileExists(homePath+headerPathTest))

MaxID := 1000
MaxByteArraySize := int(blockSizeTest * 0.5)
VFSize := int(3.5 * blockSizeTest)

virtualFiles := make([]*virtualFile.VirtualFile, 0)
numberOfVFs := 5
bytes := make([][][]byte, numberOfVFs)
vfIDs := make([]uint32, 0)
for i := 0; i < numberOfVFs; i++ {
vfID := uint32(rand.Intn(MaxID))
if utils.ItemExists(vfIDs, vfID) {
i = i - 1
continue
}
vfIDs = append(vfIDs, vfID)
vf, err := fse.NewVirtualFile(vfID, "test"+strconv.Itoa(i))
if assert.Equal(t, nil, err) {
virtualFiles = append(virtualFiles, vf)
}
}
if len(virtualFiles) != numberOfVFs {
return
}

for j, vf := range virtualFiles {
size := 0
for {
token := make([]byte, uint32(rand.Intn(MaxByteArraySize)))
m, err := rand.Read(token)
assert.Equal(t, nil, err)
bytes[j] = append(bytes[j], token)
size = size + m
n, err := vf.Write(token)
assert.Equal(t, nil, err)
assert.Equal(t, m, n)

if size > VFSize {
break
}
}
err = vf.Close()
assert.Equal(t, nil, err)
}

for i, vBytes := range bytes {
vf2, err := fse.OpenVirtualFile(vfIDs[i])
assert.Equal(t, nil, err)

for _, v := range vBytes {
buf := make([]byte, len(v))
m, err := vf2.Read(buf)

assert.Equal(t, nil, err)
if err != nil {
break
}

assert.Equal(t, len(v), m)
assert.Equal(t, v, buf)
}
}

err = fse.Close()
assert.Equal(t, nil, err)
_ = utils.DeleteFile(homePath + fsPathTest)
_ = utils.DeleteFile(homePath + headerPathTest)
}
15 changes: 9 additions & 6 deletions blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ func (fse *FSEngine) NoSpace() uint32 {
func (fse *FSEngine) prepareBlock(data []byte, fileID uint32, previousBlock uint32, blockID uint32) ([]byte, error) {
dataTmp := make([]byte, 0)

a := make([]byte, BlockHeaderSize)
binary.BigEndian.PutUint32(a, blockID)
binary.BigEndian.PutUint32(a, fileID)
binary.BigEndian.PutUint32(a, previousBlock)
binary.BigEndian.PutUint32(a, uint32(len(data)))
dataTmp = append(dataTmp, a...)
headerTmp := make([]byte, 4)
binary.BigEndian.PutUint32(headerTmp, blockID)
dataTmp = append(dataTmp, headerTmp...)
binary.BigEndian.PutUint32(headerTmp, fileID)
dataTmp = append(dataTmp, headerTmp...)
binary.BigEndian.PutUint32(headerTmp, previousBlock)
dataTmp = append(dataTmp, headerTmp...)
binary.BigEndian.PutUint32(headerTmp, uint32(len(data)))
dataTmp = append(dataTmp, headerTmp...)
dataTmp = append(dataTmp, data...)

return dataTmp, nil
Expand Down
6 changes: 2 additions & 4 deletions configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import "sync"

var configs = struct {
MaxSizeVirtualFile uint64
mutex sync.Mutex
mutex sync.Mutex
}{
MaxSizeVirtualFile: 10*1024*1024*1024, // 10 GB
MaxSizeVirtualFile: 10 * 1024 * 1024 * 1024, // 10 GB
}

func SetConfigs(maxSizeVirtualFile uint64) {
Expand All @@ -20,5 +20,3 @@ func GetMaxSizeVirtualFile() uint64 {
defer configs.mutex.Unlock()
return configs.MaxSizeVirtualFile
}


2 changes: 1 addition & 1 deletion crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (fse *FSEngine) OpenVirtualFile(id uint32) (*virtualFile.VirtualFile, error
if err != nil {
return nil, err
}
vf := virtualFile.OpenVirtualFile(fileInfo, fse.blockSize-BlockHeaderSize, fse, blm,
vf := virtualFile.OpenVirtualFile(&fileInfo, fse.blockSize-BlockHeaderSize, fse, blm,
int(fse.blockSize-BlockHeaderSize)*VirtualFileBufferBlockNumber, fse.log)
//err = fse.header.AddVirtualFile(id, fileInfo.GetName())
//if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion file.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type FSEngine struct {
LastFiletime time.Time // time where the first data of the file has been written
maxNumberOfBlocks uint32 // total number of blocks in Archiver
blockSize uint32 // in bytes, size of each block
blockSizeUsable uint32
// lastWrittenBlock uint32 // the last block that has been written into
blockAllocationMap *blockAllocationMap.BlockAllocationMap // BAM data in memory coded with roaring, to be synced later on to Disk.
openFiles map[uint32]*virtualFile.VirtualFile
Expand All @@ -32,7 +33,7 @@ type FSEngine struct {
RMux sync.Mutex
log *log.Logger
// fiMux sync.RWMutex
fiChecksum uint32
// fiChecksum uint32
// bamChecksum uint32
// fsMux sync.Mutex
rIBlockMux sync.Mutex
Expand Down
2 changes: 1 addition & 1 deletion internal/Header/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
FileIndexMaxByteSize = 100000

BlockAllocationMapByteIndex = FileIndexByteIndex + FileIndexMaxByteSize // BackUp Size - 1 - FileIndexBlockSize - AllocationMapBlockSize
BlockAllocationMaxByteSize = 100000
BlockAllocationMaxByteSize = 100000

//MaxArchiverTime = 3600 * time.Second
//ArchiverCheckerIter = 60 * time.Second
Expand Down
Loading

0 comments on commit 81c0f3f

Please sign in to comment.