diff --git a/src/internal/m365/backup_test.go b/src/internal/m365/backup_test.go index bc1293157e..f633564759 100644 --- a/src/internal/m365/backup_test.go +++ b/src/internal/m365/backup_test.go @@ -28,6 +28,7 @@ import ( "github.com/alcionai/corso/src/pkg/selectors" selTD "github.com/alcionai/corso/src/pkg/selectors/testdata" "github.com/alcionai/corso/src/pkg/services/m365/api" + "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) // --------------------------------------------------------------------------- @@ -53,6 +54,10 @@ func TestDataCollectionIntgSuite(t *testing.T) { func (suite *DataCollectionIntgSuite) SetupSuite() { t := suite.T() + ctx, flush := tester.NewContext(t) + defer flush() + graph.InitializeConcurrencyLimiter(ctx, false, 4) + suite.user = tconfig.M365UserID(t) suite.site = tconfig.M365SiteID(t) @@ -274,7 +279,6 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() { ctrl := newController(ctx, suite.T(), path.SharePointService) tests := []struct { name string - expected int getSelector func() selectors.Selector }{ { @@ -286,8 +290,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() { }, }, { - name: "Lists", - expected: 0, + name: "Lists", getSelector: func() selectors.Selector { sel := selectors.NewSharePointBackup(selSites) sel.Include(sel.Lists(selectors.Any())) @@ -329,8 +332,8 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() { } // we don't know an exact count of drives this will produce, - // but it should be more than one. - assert.Less(t, test.expected, len(collections)) + // but it should be more than zero. + assert.NotEmpty(t, collections) for _, coll := range collections { for object := range coll.Items(ctx, fault.New(true)) { @@ -465,7 +468,8 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() { assert.True(t, excludes.Empty()) for _, collection := range cols { - t.Logf("Path: %s\n", collection.FullPath().String()) + assert.Equal(t, path.SharePointService, collection.FullPath().Service()) + assert.Equal(t, path.ListsCategory, collection.FullPath().Category()) for item := range collection.Items(ctx, fault.New(true)) { t.Log("File: " + item.ID()) diff --git a/src/internal/m365/collection/site/backup.go b/src/internal/m365/collection/site/backup.go index c759462cdf..961bf1e082 100644 --- a/src/internal/m365/collection/site/backup.go +++ b/src/internal/m365/collection/site/backup.go @@ -123,6 +123,7 @@ func CollectPages( } collection := NewCollection( + nil, dir, ac, scope, @@ -139,6 +140,7 @@ func CollectPages( func CollectLists( ctx context.Context, + bh backupHandler, bpc inject.BackupProducerConfig, ac api.Client, tenantID string, @@ -175,6 +177,7 @@ func CollectLists( } collection := NewCollection( + bh, dir, ac, scope, diff --git a/src/internal/m365/collection/site/backup_test.go b/src/internal/m365/collection/site/backup_test.go index d80abe3586..4f7251b799 100644 --- a/src/internal/m365/collection/site/backup_test.go +++ b/src/internal/m365/collection/site/backup_test.go @@ -21,26 +21,26 @@ import ( "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) -type SharePointPagesSuite struct { +type SharePointSuite struct { tester.Suite } -func TestSharePointPagesSuite(t *testing.T) { - suite.Run(t, &SharePointPagesSuite{ +func TestSharePointSuite(t *testing.T) { + suite.Run(t, &SharePointSuite{ Suite: tester.NewIntegrationSuite( t, [][]string{tconfig.M365AcctCredEnvs}), }) } -func (suite *SharePointPagesSuite) SetupSuite() { +func (suite *SharePointSuite) SetupSuite() { ctx, flush := tester.NewContext(suite.T()) defer flush() graph.InitializeConcurrencyLimiter(ctx, false, 4) } -func (suite *SharePointPagesSuite) TestCollectPages() { +func (suite *SharePointSuite) TestCollectPages() { t := suite.T() ctx, flush := tester.NewContext(t) @@ -81,3 +81,47 @@ func (suite *SharePointPagesSuite) TestCollectPages() { assert.NoError(t, err, clues.ToCore(err)) assert.NotEmpty(t, col) } + +func (suite *SharePointSuite) TestCollectLists() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + var ( + siteID = tconfig.M365SiteID(t) + a = tconfig.NewM365Account(t) + counter = count.New() + ) + + creds, err := a.M365Config() + require.NoError(t, err, clues.ToCore(err)) + + ac, err := api.NewClient( + creds, + control.DefaultOptions(), + counter) + require.NoError(t, err, clues.ToCore(err)) + + bpc := inject.BackupProducerConfig{ + LastBackupVersion: version.NoBackup, + Options: control.DefaultOptions(), + ProtectedResource: mock.NewProvider(siteID, siteID), + } + + sel := selectors.NewSharePointBackup([]string{siteID}) + + bh := NewListsBackupHandler(bpc.ProtectedResource.ID(), ac.Lists()) + + col, err := CollectLists( + ctx, + bh, + bpc, + ac, + creds.AzureTenantID, + sel.Lists(selectors.Any())[0], + (&MockGraphService{}).UpdateStatus, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + assert.Less(t, 0, len(col)) +} diff --git a/src/internal/m365/collection/site/collection.go b/src/internal/m365/collection/site/collection.go index 6450c8e770..7c99f54ef2 100644 --- a/src/internal/m365/collection/site/collection.go +++ b/src/internal/m365/collection/site/collection.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "io" + "sync" + "sync/atomic" "github.com/alcionai/clues" "github.com/microsoft/kiota-abstractions-go/serialization" @@ -42,25 +44,29 @@ const ( var _ data.BackupCollection = &Collection{} -// Collection is the SharePoint.List implementation of data.Collection. SharePoint.Libraries collections are supported -// by the oneDrive.Collection as the calls are identical for populating the Collection +// Collection is the SharePoint.List or SharePoint.Page implementation of data.Collection. + +// SharePoint.Libraries collections are supported by the oneDrive.Collection +// as the calls are identical for populating the Collection type Collection struct { - // data is the container for each individual SharePoint.List - data chan data.Item + // stream is the container for each individual SharePoint item of (page/list) + stream chan data.Item // fullPath indicates the hierarchy within the collection fullPath path.Path - // jobs contain the SharePoint.Site.ListIDs for the associated list(s). - jobs []string + // jobs contain the SharePoint.List.IDs or SharePoint.Page.IDs + items []string // M365 IDs of the items of this collection category path.CategoryType client api.Sites ctrl control.Options betaService *betaAPI.BetaService statusUpdater support.StatusUpdater + getter getItemByIDer } // NewCollection helper function for creating a Collection func NewCollection( + getter getItemByIDer, folderPath path.Path, ac api.Client, scope selectors.SharePointScope, @@ -69,8 +75,9 @@ func NewCollection( ) *Collection { c := &Collection{ fullPath: folderPath, - jobs: make([]string, 0), - data: make(chan data.Item, collectionChannelBufferSize), + items: make([]string, 0), + getter: getter, + stream: make(chan data.Item, collectionChannelBufferSize), client: ac.Sites(), statusUpdater: statusUpdater, category: scope.Category().PathType(), @@ -86,7 +93,7 @@ func (sc *Collection) SetBetaService(betaService *betaAPI.BetaService) { // AddJob appends additional objectID to job field func (sc *Collection) AddJob(objID string) { - sc.jobs = append(sc.jobs, objID) + sc.items = append(sc.items, objID) } func (sc *Collection) FullPath() path.Path { @@ -111,15 +118,15 @@ func (sc *Collection) Items( ctx context.Context, errs *fault.Bus, ) <-chan data.Item { - go sc.populate(ctx, errs) - return sc.data + go sc.streamItems(ctx, errs) + return sc.stream } func (sc *Collection) finishPopulation( ctx context.Context, metrics support.CollectionMetrics, ) { - close(sc.data) + close(sc.stream) status := support.CreateStatus( ctx, @@ -135,121 +142,101 @@ func (sc *Collection) finishPopulation( } } -// populate utility function to retrieve data from back store for a given collection -func (sc *Collection) populate(ctx context.Context, errs *fault.Bus) { - metrics, _ := sc.runPopulate(ctx, errs) - sc.finishPopulation(ctx, metrics) -} - -func (sc *Collection) runPopulate( +// streamItems utility function to retrieve data from back store for a given collection +func (sc *Collection) streamItems( ctx context.Context, errs *fault.Bus, -) (support.CollectionMetrics, error) { - var ( - err error - metrics support.CollectionMetrics - writer = kjson.NewJsonSerializationWriter() - ) - - // TODO: Insert correct ID for CollectionProgress - colProgress := observe.CollectionProgress( - ctx, - sc.fullPath.Category().HumanString(), - sc.fullPath.Folders()) - defer close(colProgress) - +) { // Switch retrieval function based on category switch sc.category { case path.ListsCategory: - metrics, err = sc.retrieveLists(ctx, writer, colProgress, errs) + sc.streamLists(ctx, errs) case path.PagesCategory: - metrics, err = sc.retrievePages(ctx, sc.client, writer, colProgress, errs) + sc.retrievePages(ctx, sc.client, errs) } - - return metrics, err } -// retrieveLists utility function for collection that downloads and serializes +// streamLists utility function for collection that downloads and serializes // models.Listable objects based on M365 IDs from the jobs field. -func (sc *Collection) retrieveLists( +func (sc *Collection) streamLists( ctx context.Context, - wtr *kjson.JsonSerializationWriter, - progress chan<- struct{}, errs *fault.Bus, -) (support.CollectionMetrics, error) { +) { var ( - metrics support.CollectionMetrics - el = errs.Local() + metrics support.CollectionMetrics + el = errs.Local() + numLists int64 + wg sync.WaitGroup ) - // TODO: Fetch lists via Lists client wrapper - lists := []models.Listable{} + defer sc.finishPopulation(ctx, metrics) + + // TODO: Insert correct ID for CollectionProgress + progress := observe.CollectionProgress(ctx, sc.fullPath.Category().HumanString(), sc.fullPath.Folders()) + defer close(progress) + + semaphoreCh := make(chan struct{}, fetchChannelSize) + defer close(semaphoreCh) - metrics.Objects += len(lists) // For each models.Listable, object is serialized and the metrics are collected. // The progress is objected via the passed in channel. - for _, lst := range lists { + for _, listID := range sc.items { if el.Failure() != nil { break } - byteArray, err := serializeContent(ctx, wtr, lst) - if err != nil { - el.AddRecoverable(ctx, clues.WrapWC(ctx, err, "serializing list").Label(fault.LabelForceNoBackupCreation)) - continue - } - - size := int64(len(byteArray)) + wg.Add(1) + semaphoreCh <- struct{}{} - if size > 0 { - metrics.Bytes += size + sc.handleListItems(ctx, semaphoreCh, progress, numLists, listID, el, metrics) - metrics.Successes++ - - item, err := data.NewPrefetchedItemWithInfo( - io.NopCloser(bytes.NewReader(byteArray)), - ptr.Val(lst.GetId()), - details.ItemInfo{SharePoint: ListToSPInfo(lst, size)}) - if err != nil { - el.AddRecoverable(ctx, clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)) - continue - } - - sc.data <- item - progress <- struct{}{} - } + wg.Done() } - return metrics, el.Failure() + wg.Wait() + + metrics.Objects += int(numLists) } func (sc *Collection) retrievePages( ctx context.Context, as api.Sites, - wtr *kjson.JsonSerializationWriter, - progress chan<- struct{}, errs *fault.Bus, -) (support.CollectionMetrics, error) { +) { var ( metrics support.CollectionMetrics el = errs.Local() ) + defer sc.finishPopulation(ctx, metrics) + + // TODO: Insert correct ID for CollectionProgress + progress := observe.CollectionProgress(ctx, sc.fullPath.Category().HumanString(), sc.fullPath.Folders()) + defer close(progress) + + wtr := kjson.NewJsonSerializationWriter() + defer wtr.Close() + betaService := sc.betaService if betaService == nil { - return metrics, clues.NewWC(ctx, "beta service required") + logger.Ctx(ctx).Error(clues.New("beta service required")) + return } parent, err := as.GetByID(ctx, sc.fullPath.ProtectedResource(), api.CallConfig{}) if err != nil { - return metrics, err + logger.Ctx(ctx).Error(err) + + return } root := ptr.Val(parent.GetWebUrl()) - pages, err := betaAPI.GetSitePages(ctx, betaService, sc.fullPath.ProtectedResource(), sc.jobs, errs) + pages, err := betaAPI.GetSitePages(ctx, betaService, sc.fullPath.ProtectedResource(), sc.items, errs) if err != nil { - return metrics, err + logger.Ctx(ctx).Error(err) + + return } metrics.Objects = len(pages) @@ -269,25 +256,25 @@ func (sc *Collection) retrievePages( size := int64(len(byteArray)) - if size > 0 { - metrics.Bytes += size - metrics.Successes++ - - item, err := data.NewPrefetchedItemWithInfo( - io.NopCloser(bytes.NewReader(byteArray)), - ptr.Val(pg.GetId()), - details.ItemInfo{SharePoint: pageToSPInfo(pg, root, size)}) - if err != nil { - el.AddRecoverable(ctx, clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)) - continue - } - - sc.data <- item - progress <- struct{}{} + if size == 0 { + return } - } - return metrics, el.Failure() + metrics.Bytes += size + metrics.Successes++ + + item, err := data.NewPrefetchedItemWithInfo( + io.NopCloser(bytes.NewReader(byteArray)), + ptr.Val(pg.GetId()), + details.ItemInfo{SharePoint: pageToSPInfo(pg, root, size)}) + if err != nil { + el.AddRecoverable(ctx, clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)) + continue + } + + sc.stream <- item + progress <- struct{}{} + } } func serializeContent( @@ -309,3 +296,71 @@ func serializeContent( return byteArray, nil } + +func (sc *Collection) handleListItems( + ctx context.Context, + semaphoreCh chan struct{}, + progress chan<- struct{}, + numLists int64, + listID string, + el *fault.Bus, + metrics support.CollectionMetrics, +) { + defer func() { <-semaphoreCh }() + + writer := kjson.NewJsonSerializationWriter() + defer writer.Close() + + var ( + list models.Listable + err error + ) + + list, err = sc.getter.GetItemByID(ctx, listID) + if err != nil { + err = clues.WrapWC(ctx, err, "getting list data").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) + + return + } + + atomic.AddInt64(&numLists, 1) + + if err := writer.WriteObjectValue("", list); err != nil { + err = clues.WrapWC(ctx, err, "writing list to serializer").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) + + return + } + + entryBytes, err := writer.GetSerializedContent() + if err != nil { + err = clues.WrapWC(ctx, err, "serializing list").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) + + return + } + + size := int64(len(entryBytes)) + + if size == 0 { + return + } + + metrics.Bytes += size + metrics.Successes++ + + rc := io.NopCloser(bytes.NewReader(entryBytes)) + itemInfo := details.ItemInfo{SharePoint: ListToSPInfo(list, size)} + + item, err := data.NewPrefetchedItemWithInfo(rc, listID, itemInfo) + if err != nil { + err = clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) + + return + } + + sc.stream <- item + progress <- struct{}{} +} diff --git a/src/internal/m365/collection/site/collection_test.go b/src/internal/m365/collection/site/collection_test.go index 9e1d9d5c83..a177a04246 100644 --- a/src/internal/m365/collection/site/collection_test.go +++ b/src/internal/m365/collection/site/collection_test.go @@ -14,8 +14,10 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/collection/site/mock" betaAPI "github.com/alcionai/corso/src/internal/m365/service/sharepoint/api" spMock "github.com/alcionai/corso/src/internal/m365/service/sharepoint/mock" + "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" @@ -77,6 +79,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { tables := []struct { name, itemName string scope selectors.SharePointScope + getter getItemByIDer getDir func(t *testing.T) path.Path getItem func(t *testing.T, itemName string) data.Item }{ @@ -84,6 +87,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { name: "List", itemName: "MockListing", scope: sel.Lists(selectors.Any())[0], + getter: &mock.ListHandler{}, getDir: func(t *testing.T) path.Path { dir, err := path.Build( tenant, @@ -120,6 +124,7 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { name: "Pages", itemName: "MockPages", scope: sel.Pages(selectors.Any())[0], + getter: nil, getDir: func(t *testing.T) path.Path { dir, err := path.Build( tenant, @@ -156,12 +161,13 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { defer flush() col := NewCollection( + test.getter, test.getDir(t), suite.ac, test.scope, nil, control.DefaultOptions()) - col.data <- test.getItem(t, test.itemName) + col.stream <- test.getItem(t, test.itemName) readItems := []data.Item{} @@ -184,6 +190,99 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { } } +func (suite *SharePointCollectionSuite) TestCollection_streamItems() { + var ( + t = suite.T() + statusUpdater = func(*support.ControllerOperationStatus) {} + tenant = "some" + resource = "siteid" + list = "list" + ) + + table := []struct { + name string + category path.CategoryType + items []string + getDir func(t *testing.T) path.Path + }{ + { + name: "no items", + items: []string{}, + category: path.ListsCategory, + getDir: func(t *testing.T) path.Path { + dir, err := path.Build( + tenant, + resource, + path.SharePointService, + path.ListsCategory, + false, + list) + require.NoError(t, err, clues.ToCore(err)) + + return dir + }, + }, + { + name: "with items", + items: []string{"list1", "list2", "list3"}, + category: path.ListsCategory, + getDir: func(t *testing.T) path.Path { + dir, err := path.Build( + tenant, + resource, + path.SharePointService, + path.ListsCategory, + false, + list) + require.NoError(t, err, clues.ToCore(err)) + + return dir + }, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t.Log("running test", test) + + var ( + errs = fault.New(true) + itemCount int + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + col := &Collection{ + fullPath: test.getDir(t), + category: test.category, + items: test.items, + getter: &mock.ListHandler{}, + stream: make(chan data.Item), + statusUpdater: statusUpdater, + } + + itemMap := func(js []string) map[string]struct{} { + m := make(map[string]struct{}) + for _, j := range js { + m[j] = struct{}{} + } + return m + }(test.items) + + go col.streamItems(ctx, errs) + + for item := range col.stream { + itemCount++ + _, ok := itemMap[item.ID()] + assert.True(t, ok, "should fetch item") + } + + assert.NoError(t, errs.Failure()) + assert.Equal(t, len(test.items), itemCount, "should see all expected items") + }) + } +} + // TestRestoreListCollection verifies Graph Restore API for the List Collection func (suite *SharePointCollectionSuite) TestListCollection_Restore() { t := suite.T() diff --git a/src/internal/m365/collection/site/handlers.go b/src/internal/m365/collection/site/handlers.go new file mode 100644 index 0000000000..72d96d6b58 --- /dev/null +++ b/src/internal/m365/collection/site/handlers.go @@ -0,0 +1,15 @@ +package site + +import ( + "context" + + "github.com/microsoftgraph/msgraph-sdk-go/models" +) + +type backupHandler interface { + getItemByIDer +} + +type getItemByIDer interface { + GetItemByID(ctx context.Context, itemID string) (models.Listable, error) +} diff --git a/src/internal/m365/collection/site/lists_handler.go b/src/internal/m365/collection/site/lists_handler.go new file mode 100644 index 0000000000..1dab11c584 --- /dev/null +++ b/src/internal/m365/collection/site/lists_handler.go @@ -0,0 +1,27 @@ +package site + +import ( + "context" + + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +var _ backupHandler = &listsBackupHandler{} + +type listsBackupHandler struct { + ac api.Lists + protectedResource string +} + +func NewListsBackupHandler(protectedResource string, ac api.Lists) listsBackupHandler { + return listsBackupHandler{ + ac: ac, + protectedResource: protectedResource, + } +} + +func (bh listsBackupHandler) GetItemByID(ctx context.Context, itemID string) (models.Listable, error) { + return bh.ac.GetListByID(ctx, bh.protectedResource, itemID) +} diff --git a/src/internal/m365/collection/site/mock/list.go b/src/internal/m365/collection/site/mock/list.go new file mode 100644 index 0000000000..30b4d50b15 --- /dev/null +++ b/src/internal/m365/collection/site/mock/list.go @@ -0,0 +1,23 @@ +package mock + +import ( + "context" + + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/internal/common/ptr" +) + +type ListHandler struct { + ListItem models.Listable + Err error +} + +func (lh *ListHandler) GetItemByID(ctx context.Context, itemID string) (models.Listable, error) { + ls := models.NewList() + + lh.ListItem = ls + lh.ListItem.SetId(ptr.To(itemID)) + + return ls, lh.Err +} diff --git a/src/internal/m365/service/sharepoint/backup.go b/src/internal/m365/service/sharepoint/backup.go index 9961578a97..d4f82c0bf7 100644 --- a/src/internal/m365/service/sharepoint/backup.go +++ b/src/internal/m365/service/sharepoint/backup.go @@ -55,8 +55,11 @@ func ProduceBackupCollections( switch scope.Category().PathType() { case path.ListsCategory: + bh := site.NewListsBackupHandler(bpc.ProtectedResource.ID(), ac.Lists()) + spcs, err = site.CollectLists( ctx, + bh, bpc, ac, creds.AzureTenantID,