Skip to content

Commit

Permalink
fix/32/pr-ports : Pulled changes from PR 4517
Browse files Browse the repository at this point in the history
Pulled changes from ipfs/kubo#4517, on top of, ipfs#45.
Change added to unblock the `waitPub()` call. With the elimination of
stateSync cause a `updateChildEntry` to happen for `stateFlushed` as
well, causing it to propogate upwards to the parent(s) [fullSync] and
force a publish to happen, hence unblocking `waitPub`.
  • Loading branch information
nmalhotra committed Dec 30, 2018
1 parent 4fb6dc4 commit 2642dbf
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 115 deletions.
168 changes: 94 additions & 74 deletions fd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ import (
mod "github.com/ipfs/go-unixfs/mod"

context "context"

ipld "github.com/ipfs/go-ipld-format"
)

type state uint8

const (
stateFlushed state = iota
stateDirty
stateClosed
)

// One `File` can have many `FileDescriptor`s associated to it
Expand All @@ -31,14 +41,31 @@ type FileDescriptor interface {
}

type fileDescriptor struct {
inode *File
mod *mod.DagModifier
perms int
sync bool
hasChanges bool

// TODO: Where is this variable set?
closed bool
inode *File
mod *mod.DagModifier
flags Flags

state state
}

func (fi *fileDescriptor) checkWrite() error {
if fi.state == stateClosed {
return ErrClosed
}
if !fi.flags.Write {
return fmt.Errorf("file is read-only")
}
return nil
}

func (fi *fileDescriptor) checkRead() error {
if fi.state == stateClosed {
return ErrClosed
}
if !fi.flags.Read {
return fmt.Errorf("file is write-only")
}
return nil
}

// Size returns the size of the file referred to by this descriptor
Expand All @@ -48,69 +75,52 @@ func (fi *fileDescriptor) Size() (int64, error) {

// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if fi.perms == OpenReadOnly {
return fmt.Errorf("cannot call truncate on readonly file descriptor")
if err := fi.checkWrite(); err != nil {
return fmt.Errorf("truncate failed: %s", err)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.Truncate(size)
}

// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write failed: %s", err)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.Write(b)
}

// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
}
return fi.mod.Read(b)
}

// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
}
return fi.mod.CtxReadFull(ctx, b)
}

// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
defer func() {
switch fi.perms {
case OpenReadOnly:
fi.inode.desclock.RUnlock()
case OpenWriteOnly, OpenReadWrite:
fi.inode.desclock.Unlock()
}
// TODO: `closed` should be set here.
}()

if fi.closed {
panic("attempted to close file descriptor twice!")
if fi.state == stateClosed {
return ErrClosed
}

if fi.hasChanges {
err := fi.mod.Sync()
if err != nil {
return err
}

fi.hasChanges = false

// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp(fi.sync)
if fi.flags.Write {
defer fi.inode.desclock.Unlock()
} else if fi.flags.Read {
defer fi.inode.desclock.RUnlock()
}

return nil
err := fi.flushUp(fi.flags.Sync)
fi.state = stateClosed
return err
}

// Flush generates a new version of the node of the underlying
Expand All @@ -126,47 +136,57 @@ func (fi *fileDescriptor) Flush() error {
// If `fullSync` is set the changes are propagated upwards
// (the `Up` part of `flushUp`).
func (fi *fileDescriptor) flushUp(fullSync bool) error {
nd, err := fi.mod.GetNode()
if err != nil {
return err
}
var nd ipld.Node
switch fi.state {
case stateDirty:
// calls mod.Sync internally.
var err error
nd, err = fi.mod.GetNode()
if err != nil {
return err
}
err = fi.inode.dagService.Add(context.TODO(), nd)
if err != nil {
return err
}
fi.inode.nodeLock.Lock()
fi.inode.node = nd
fi.inode.nodeLock.Unlock()
fallthrough
case stateFlushed:
if !fullSync {
return nil
}

err = fi.inode.dagService.Add(context.TODO(), nd)
if err != nil {
return err
}
// TODO: Very similar logic to the update process in
// `Directory`, the logic should be unified, both structures
// (`File` and `Directory`) are backed by a IPLD node with
// a UnixFS format that is the actual target of the update
// (regenerating it and adding it to the DAG service).

fi.inode.nodeLock.Lock()
fi.inode.node = nd
// TODO: Create a `SetNode` method.
name := fi.inode.name
parent := fi.inode.parent
// TODO: Can the parent be modified? Do we need to do this inside the lock?
fi.inode.nodeLock.Unlock()
// TODO: Maybe all this logic should happen in `File`.

if fullSync {
return parent.updateChildEntry(child{name, nd})
}
fi.inode.nodeLock.Lock()
nd = fi.inode.node
parent := fi.inode.parent
name := fi.inode.name
fi.inode.nodeLock.Unlock()

return nil
if err := parent.updateChildEntry(child{name, nd}); err != nil {
return err
}
fi.state = stateFlushed
return nil
default:
panic("invalid state")
}
}

// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
if fi.state == stateClosed {
return 0, fmt.Errorf("seek failed: %s", ErrClosed)
}
return fi.mod.Seek(offset, whence)
}

// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write-at failed: %s", err)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.WriteAt(b, at)
}
41 changes: 21 additions & 20 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type File struct {
// entire DAG of nodes that comprise the file.
// TODO: Rename, there should be an explicit term for these root nodes
// of a particular sub-DAG that abstract an upper layer's entity.
node ipld.Node
node ipld.Node

// Lock around the `node` that represents this file, necessary because
// there may be many `FileDescriptor`s operating on this `File`.
Expand All @@ -52,13 +52,25 @@ func NewFile(name string, node ipld.Node, parent parent, dserv ipld.DAGService)
return fi, nil
}

const (
OpenReadOnly = iota
OpenWriteOnly
OpenReadWrite
)
func (fi *File) Open(flags Flags) (_ FileDescriptor, _retErr error) {
if flags.Write {
fi.desclock.Lock()
defer func() {
if _retErr != nil {
fi.desclock.Unlock()
}
}()
} else if flags.Read {
fi.desclock.RLock()
defer func() {
if _retErr != nil {
fi.desclock.Unlock()
}
}()
} else {
return nil, fmt.Errorf("file opened for neither reading nor writing")
}

func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
fi.nodeLock.RLock()
node := fi.node
fi.nodeLock.RUnlock()
Expand Down Expand Up @@ -86,16 +98,6 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
// Ok as well.
}

switch flags {
case OpenReadOnly:
fi.desclock.RLock()
case OpenWriteOnly, OpenReadWrite:
fi.desclock.Lock()
default:
// TODO: support other modes
return nil, fmt.Errorf("mode not supported")
}

dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dagService, chunker.DefaultSplitter)
// TODO: Remove the use of the `chunker` package here, add a new `NewDagModifier` in
// `go-unixfs` with the `DefaultSplitter` already included.
Expand All @@ -106,8 +108,7 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {

return &fileDescriptor{
inode: fi,
perms: flags,
sync: sync,
flags: flags,
mod: dmod,
}, nil
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (fi *File) GetNode() (ipld.Node, error) {
// a file without flushing?)
func (fi *File) Flush() error {
// open the file in fullsync mode
fd, err := fi.Open(OpenWriteOnly, true)
fd, err := fi.Open(Flags{Write: true, Sync: true})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 2642dbf

Please sign in to comment.