Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename data pkg Stream to Item #3966

Merged
merged 5 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,19 @@ import (
"io"
"time"

"github.com/alcionai/clues"

"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)

// ------------------------------------------------------------------------------------------------
// standard ifaces
// ------------------------------------------------------------------------------------------------

var ErrNotFound = clues.New("not found")

type CollectionState int

const (
NewState = CollectionState(iota)
NotMovedState
MovedState
DeletedState
)

// A Collection represents the set of data within a single logical location
// denoted by FullPath.
type Collection interface {
// Items returns a channel from which items in the collection can be read.
// Each returned struct contains the next item in the collection
// The channel is closed when there are no more items in the collection or if
// an unrecoverable error caused an early termination in the sender.
Items(ctx context.Context, errs *fault.Bus) <-chan Stream
Items(ctx context.Context, errs *fault.Bus) <-chan Item
// FullPath returns a path struct that acts as a metadata tag for this
// Collection.
FullPath() path.Path
Expand Down Expand Up @@ -77,32 +60,15 @@ type FetchItemByNamer interface {
// Fetch retrieves an item with the given name from the Collection if it
// exists. Items retrieved with Fetch may still appear in the channel returned
// by Items().
FetchItemByName(ctx context.Context, name string) (Stream, error)
FetchItemByName(ctx context.Context, name string) (Item, error)
}

// NoFetchRestoreCollection is a wrapper for a Collection that returns
// ErrNotFound for all Fetch calls.
type NoFetchRestoreCollection struct {
Collection
FetchItemByNamer
}

func (c NoFetchRestoreCollection) FetchItemByName(context.Context, string) (Stream, error) {
return nil, ErrNotFound
}

type FetchRestoreCollection struct {
Collection
FetchItemByNamer
}

// Stream represents a single item within a Collection
// that can be consumed as a stream (it embeds io.Reader)
type Stream interface {
// ToReader returns an io.Reader for the DataStream
// Item represents a single item within a Collection
type Item interface {
// ToReader returns an io.Reader with the item's data
ToReader() io.ReadCloser
// UUID provides a unique identifier for this data
UUID() string
// ID provides a unique identifier for this item
ID() string
// Deleted returns true if the item represented by this Stream has been
// deleted and should be removed from the current in-progress backup.
Deleted() bool
Expand All @@ -125,40 +91,20 @@ type PreviousLocationPather interface {
PreviousLocationPath() details.LocationIDer
}

// StreamInfo is used to provide service specific
// information about the Stream
type StreamInfo interface {
// ItemInfo returns the details.ItemInfo for the item.
type ItemInfo interface {
Info() details.ItemInfo
}

// StreamSize is used to provide size
// information about the Stream
type StreamSize interface {
// ItemSize returns the size of the item in bytes.
type ItemSize interface {
Size() int64
}

// StreamModTime is used to provide the modified time of the stream's data.
// ItemModTime provides the last modified time of the item.
//
// If an item implements StreamModTime and StreamInfo it should return the same
// If an item implements ItemModTime and ItemInfo it should return the same
// value here as in item.Info().Modified().
type StreamModTime interface {
type ItemModTime interface {
ModTime() time.Time
}

// StateOf lets us figure out the state of the collection from the
// previous and current path
func StateOf(prev, curr path.Path) CollectionState {
if curr == nil || len(curr.String()) == 0 {
return DeletedState
}

if prev == nil || len(prev.String()) == 0 {
return NewState
}

if curr.String() != prev.String() {
return MovedState
}

return NotMovedState
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)

type DataCollectionSuite struct {
type CollectionSuite struct {
tester.Suite
}

func TestDataCollectionSuite(t *testing.T) {
suite.Run(t, &DataCollectionSuite{Suite: tester.NewUnitSuite(t)})
suite.Run(t, &CollectionSuite{Suite: tester.NewUnitSuite(t)})
}

func (suite *DataCollectionSuite) TestStateOf() {
func (suite *CollectionSuite) TestStateOf() {
fooP, err := path.Build("t", "u", path.ExchangeService, path.EmailCategory, false, "foo")
require.NoError(suite.T(), err, clues.ToCore(err))
barP, err := path.Build("t", "u", path.ExchangeService, path.EmailCategory, false, "bar")
Expand Down
54 changes: 54 additions & 0 deletions src/internal/data/implementations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package data
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved out of /data/collections to separate interfaces and non-interfaces (the code was getting jumbled in there). Only movement, nothing new/changed.


import (
"context"

"github.com/alcionai/clues"

"github.com/alcionai/corso/src/pkg/path"
)

var ErrNotFound = clues.New("not found")

type CollectionState int

const (
NewState = CollectionState(iota)
NotMovedState
MovedState
DeletedState
)

type FetchRestoreCollection struct {
Collection
FetchItemByNamer
}

// NoFetchRestoreCollection is a wrapper for a Collection that returns
// ErrNotFound for all Fetch calls.
type NoFetchRestoreCollection struct {
Collection
FetchItemByNamer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know this is code movement, but this field should be removed. We're providing a function that gives this functionality, not using an embedded interface

Suggested change
FetchItemByNamer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix that in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashmrtn please see: #4013

}

func (c NoFetchRestoreCollection) FetchItemByName(context.Context, string) (Item, error) {
return nil, ErrNotFound
}

// StateOf lets us figure out the state of the collection from the
// previous and current path
func StateOf(prev, curr path.Path) CollectionState {
if curr == nil || len(curr.String()) == 0 {
return DeletedState
}

if prev == nil || len(prev.String()) == 0 {
return NewState
}

if curr.String() != prev.String() {
return MovedState
}

return NotMovedState
}
36 changes: 18 additions & 18 deletions src/internal/data/mock/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,46 @@ import (
)

// ---------------------------------------------------------------------------
// stream
// Item
// ---------------------------------------------------------------------------

var _ data.Stream = &Stream{}
var _ data.Item = &Item{}

type Stream struct {
ID string
Reader io.ReadCloser
ReadErr error
ItemSize int64
ModifiedTime time.Time
type Item struct {
DeletedFlag bool
ItemID string
ItemInfo details.ItemInfo
ItemSize int64
ModifiedTime time.Time
Reader io.ReadCloser
ReadErr error
}

func (s *Stream) UUID() string {
return s.ID
func (s *Item) ID() string {
return s.ItemID
}

func (s Stream) Deleted() bool {
func (s Item) Deleted() bool {
return s.DeletedFlag
}

func (s *Stream) ToReader() io.ReadCloser {
func (s *Item) ToReader() io.ReadCloser {
if s.ReadErr != nil {
return io.NopCloser(errReader{s.ReadErr})
}

return s.Reader
}

func (s *Stream) Info() details.ItemInfo {
func (s *Item) Info() details.ItemInfo {
return s.ItemInfo
}

func (s *Stream) Size() int64 {
func (s *Item) Size() int64 {
return s.ItemSize
}

func (s *Stream) ModTime() time.Time {
func (s *Item) ModTime() time.Time {
return s.ModifiedTime
}

Expand All @@ -77,7 +77,7 @@ var (

type Collection struct{}

func (c Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Stream {
func (c Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
return nil
}

Expand All @@ -97,6 +97,6 @@ func (c Collection) DoNotMergeItems() bool {
return true
}

func (c Collection) FetchItemByName(ctx context.Context, name string) (data.Stream, error) {
return &Stream{}, clues.New("not implemented")
func (c Collection) FetchItemByName(ctx context.Context, name string) (data.Item, error) {
return &Item{}, clues.New("not implemented")
}
18 changes: 9 additions & 9 deletions src/internal/kopia/data_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

var (
_ data.RestoreCollection = &kopiaDataCollection{}
_ data.Stream = &kopiaDataStream{}
_ data.Item = &kopiaDataStream{}
)

type kopiaDataCollection struct {
Expand All @@ -29,9 +29,9 @@ type kopiaDataCollection struct {
func (kdc *kopiaDataCollection) Items(
ctx context.Context,
errs *fault.Bus,
) <-chan data.Stream {
) <-chan data.Item {
var (
res = make(chan data.Stream)
res = make(chan data.Item)
el = errs.Local()
loadCount = 0
)
Expand Down Expand Up @@ -72,12 +72,12 @@ func (kdc kopiaDataCollection) FullPath() path.Path {
}

// Fetch returns the file with the given name from the collection as a
// data.Stream. Returns a data.ErrNotFound error if the file isn't in the
// data.Item. Returns a data.ErrNotFound error if the file isn't in the
// collection.
func (kdc kopiaDataCollection) FetchItemByName(
ctx context.Context,
name string,
) (data.Stream, error) {
) (data.Item, error) {
ctx = clues.Add(ctx, "item_name", clues.Hide(name))

if kdc.dir == nil {
Expand Down Expand Up @@ -119,7 +119,7 @@ func (kdc kopiaDataCollection) FetchItemByName(
}

return &kopiaDataStream{
uuid: name,
id: name,
reader: &restoreStreamReader{
ReadCloser: r,
expectedVersion: kdc.expectedVersion,
Expand All @@ -130,16 +130,16 @@ func (kdc kopiaDataCollection) FetchItemByName(

type kopiaDataStream struct {
reader io.ReadCloser
uuid string
id string
size int64
}

func (kds kopiaDataStream) ToReader() io.ReadCloser {
return kds.reader
}

func (kds kopiaDataStream) UUID() string {
return kds.uuid
func (kds kopiaDataStream) ID() string {
return kds.id
}

func (kds kopiaDataStream) Deleted() bool {
Expand Down
14 changes: 7 additions & 7 deletions src/internal/kopia/data_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/alcionai/corso/src/internal/data"
exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
dataMock "github.com/alcionai/corso/src/internal/data/mock"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
Expand Down Expand Up @@ -240,25 +240,25 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
bus = fault.New(false)
)

for returnedStream := range c.Items(ctx, bus) {
for item := range c.Items(ctx, bus) {
require.Less(t, len(found), len(test.expectedLoaded), "items read safety")

found = append(found, loadedData{})
f := &found[len(found)-1]
f.uuid = returnedStream.UUID()
f.uuid = item.ID()

buf, err := io.ReadAll(returnedStream.ToReader())
buf, err := io.ReadAll(item.ToReader())
if !assert.NoError(t, err, clues.ToCore(err)) {
continue
}

f.data = buf

if !assert.Implements(t, (*data.StreamSize)(nil), returnedStream) {
if !assert.Implements(t, (*data.ItemSize)(nil), item) {
continue
}

ss := returnedStream.(data.StreamSize)
ss := item.(data.ItemSize)

f.size = ss.Size()
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
errFileName2 = "error2"

noErrFileData = "foo bar baz"
errReader = &exchMock.Data{
errReader = &dataMock.Item{
ReadErr: assert.AnError,
}
)
Expand Down
Loading