Skip to content

Commit

Permalink
feat(storage/dataflux): add dataflux interface (#10748)
Browse files Browse the repository at this point in the history
feat: add dataflux interface and helper functions to storage/dataflux.
Dataflux fast-listing will be used to quickly list objects in a bucket in parallel.

Fixes #10731
  • Loading branch information
akansha1812 authored Sep 19, 2024
1 parent 9199843 commit cb7b0a1
Show file tree
Hide file tree
Showing 9 changed files with 831 additions and 1 deletion.
65 changes: 65 additions & 0 deletions storage/dataflux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Dataflux for Google Cloud Storage Go client library

## Overview
The purpose of this client is to quickly list data stored in GCS.

## Fast List
The fast list component of this client leverages GCS API to parallelize the listing of files within a GCS bucket. It does this by implementing a workstealing algorithm, where each worker in the list operation is able to steal work from its siblings once it has finished all currently stated listing work. This parallelization leads to a significant real world speed increase than sequential listing. Note that paralellization is limited by the machine on which the client runs.

Benchmarking has demonstrated that the larger the object count, the better Dataflux performs when compared to a linear listing. Around 100k objects, users will see improvemement in listing speed.

### Example Usage

First create a `storage.Client` to use throughout your application:

[snip]:# (storage-1)
```go
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
```

[snip]:# (storage-2)
```go

// storage.Query to filter objects that the user wants to list.
query := storage.Query{}
// Input for fast-listing.
dfopts := dataflux.ListerInput{
BucketName: "bucket",
Parallelism: 500,
BatchSize: 500000,
Query: query,
}

// Construct a dataflux lister.
df, close = dataflux.NewLister(sc, dfopts)
defer close()

// List objects in GCS bucket.
for {
objects, err := df.NextBatch(ctx)

if err == iterator.Done {
// No more objects in the bucket to list.
break
}
if err != nil {
log.Fatal(err)
}
// TODO: process objects
}
```

### Fast List Benchmark Results
VM used : n2d-standard-48
Region: us-central1-a
NIC type: gVNIC
|File Count|VM Core Count|List Time Without Dataflux |List Time With Dataflux|
|------------|-------------|--------------------------|-----------------------|
|5000000 Obj |48 Core |319.72s |17.35s |
|1999032 Obj |48 Core |139.54s |8.98s |
|578703 Obj |48 Core |32.90s |5.71s |
|10448 Obj |48 Core |750.50ms |637.17ms |
27 changes: 27 additions & 0 deletions storage/dataflux/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package dataflux provides an easy way to parallelize listing in Google
Cloud Storage.
More information about Google Cloud Storage is available at
https://cloud.google.com/storage/docs.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
NOTE: This package is in preview. It is not stable, and is likely to change.
*/
package dataflux // import "cloud.google.com/go/storage/dataflux"
69 changes: 69 additions & 0 deletions storage/dataflux/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux_test

import (
"context"
"log"

"cloud.google.com/go/storage"
"cloud.google.com/go/storage/dataflux"
"google.golang.org/api/iterator"
)

func ExampleNextBatch_batch() {
ctx := context.Background()
// Pass in any client opts or set retry policy here.
client, err := storage.NewClient(ctx)
if err != nil {
// handle error
}

// Create dataflux fast-list input and provide desired options,
// including number of workers, batch size, query to filer objects, etc.
in := &dataflux.ListerInput{
BucketName: "mybucket",
// Optionally specify params to apply to lister.
Parallelism: 100,
BatchSize: 500000,
Query: storage.Query{},
SkipDirectoryObjects: false,
}

// Create Lister with desired options, including number of workers,
// part size, per operation timeout, etc.
df := dataflux.NewLister(client, in)
defer df.Close()

var numOfObjects int

for {
objects, err := df.NextBatch(ctx)
if err != nil {
// handle error
}

if err == iterator.Done {
numOfObjects += len(objects)
// No more objects in the bucket to list.
break
}
if err != nil {
// handle error
}
numOfObjects += len(objects)
}
log.Printf("listing %d objects in bucket %q is complete.", numOfObjects, in.BucketName)
}
156 changes: 156 additions & 0 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"
"errors"
"fmt"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
)

// listingMethod represents the method of listing.
type listingMethod int

const (
// open when any method can be used to list.
open listingMethod = iota
// sequential when the listing is done sequentially.
sequential
// worksteal when the listing is done using work stealing algorithm.
worksteal
)

// ListerInput contains options for listing objects.
type ListerInput struct {
// BucketName is the name of the bucket to list objects from. Required.
BucketName string

// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
Parallelism int

// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
// The number of objects returned will be rounded up to a multiple of gcs page size.
BatchSize int

// Query is the query to filter objects for listing. Default value is nil. Optional.
//Use ProjectionNoACL For faster listing. ACL is expensive and this results in fewer objects
// to be returned from GCS in each API call.
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
SkipDirectoryObjects bool
}

// Lister is used for interacting with Dataflux fast-listing.
// The caller should initialize it with NewLister() instead of creating it directly.
type Lister struct {
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
method listingMethod

// pageToken is the token to use for sequential listing.
pageToken string

// bucket is the bucket handle to list objects from.
bucket *storage.BucketHandle

// batchSize is the number of objects to list.
batchSize int

// query is the query to filter objects for listing.
query storage.Query

// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)
lister := &Lister{
method: open,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
}
return lister
}

// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly
// return a list of objects in the bucket. For smaller dataset,
// sequential listing is expected to be faster. For larger dataset,
// worksteal listing is expected to be faster.
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
// countError tracks the number of failed listing methods.
countError := 0
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of the method
// is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)

// To start listing method is Open and runs both worksteal and sequential listing in parallel.
// The method which completes first is used for all subsequent runs.
// TODO: Run worksteal listing when method is Open or WorkSteal.
// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {

g.Go(func() error {
objects, nextToken, err := c.sequentialListing(childCtx)
if err != nil {
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing and ranges to nil.
// The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
// Close context when sequential listing is complete.
cancel()
return nil
})
}

// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the other method.
// If both sequential and worksteal listing fail due to context canceled, only then return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
}

// If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
if c.pageToken == "" {
return results, iterator.Done
}
return results, nil
}

// Close closes the range channel of the Lister.
func (c *Lister) Close() {

// TODO: Close range channel for worksteal lister.
}
Loading

0 comments on commit cb7b0a1

Please sign in to comment.