Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move extension/storage to extension/experimental/storage #4082

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions exporter/exporterhelper/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/extension/storage"
"go.opentelemetry.io/collector/extension/experimental/storageextension"
)

// persistentQueue holds the queue backed by file storage
Expand All @@ -38,7 +38,7 @@ type persistentQueue struct {
}

// newPersistentQueue creates a new queue backed by file storage; name parameter must be a unique value that identifies the queue
func newPersistentQueue(ctx context.Context, name string, capacity int, logger *zap.Logger, client storage.Client, unmarshaler requestUnmarshaler) *persistentQueue {
func newPersistentQueue(ctx context.Context, name string, capacity int, logger *zap.Logger, client storageextension.Client, unmarshaler requestUnmarshaler) *persistentQueue {
return &persistentQueue{
logger: logger,
stopChan: make(chan struct{}),
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/storage"
"go.opentelemetry.io/collector/extension/experimental/storageextension"
"go.opentelemetry.io/collector/model/pdata"
)

func createTestQueue(extension storage.Extension, capacity int) *persistentQueue {
func createTestQueue(extension storageextension.Extension, capacity int) *persistentQueue {
logger, _ := zap.NewDevelopment()

client, err := extension.GetClient(context.Background(), component.KindReceiver, config.ComponentID{}, "")
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opencensus.io/metric/metricdata"
"go.uber.org/zap"

"go.opentelemetry.io/collector/extension/storage"
"go.opentelemetry.io/collector/extension/experimental/storageextension"
)

// persistentStorage provides an interface for request storage operations
Expand Down Expand Up @@ -69,7 +69,7 @@ type persistentStorage interface {
type persistentContiguousStorage struct {
logger *zap.Logger
queueName string
client storage.Client
client storageextension.Client
unmarshaler requestUnmarshaler

putChan chan struct{}
Expand Down Expand Up @@ -109,7 +109,7 @@ var (
// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
// The queue needs to be initialized separately using initPersistentContiguousStorage.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler requestUnmarshaler) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storageextension.Client, unmarshaler requestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
Expand Down
16 changes: 8 additions & 8 deletions exporter/exporterhelper/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"go.uber.org/zap"

"go.opentelemetry.io/collector/extension/storage"
"go.opentelemetry.io/collector/extension/experimental/storageextension"
)

var errItemIndexArrInvalidDataType = errors.New("invalid data type, expected []itemIndex")
Expand All @@ -35,16 +35,16 @@ type batchStruct struct {
logger *zap.Logger
pcs *persistentContiguousStorage

operations []storage.Operation
getOperations map[string]storage.Operation
operations []storageextension.Operation
getOperations map[string]storageextension.Operation
}

func newBatch(pcs *persistentContiguousStorage) *batchStruct {
return &batchStruct{
logger: pcs.logger,
pcs: pcs,
operations: []storage.Operation{},
getOperations: map[string]storage.Operation{},
operations: []storageextension.Operation{},
getOperations: map[string]storageextension.Operation{},
}
}

Expand All @@ -64,7 +64,7 @@ func (bof *batchStruct) set(key string, value interface{}, marshal func(interfac
if err != nil {
bof.logger.Debug("Failed marshaling item, skipping it", zap.String(zapKey, key), zap.Error(err))
} else {
bof.operations = append(bof.operations, storage.SetOperation(key, valueBytes))
bof.operations = append(bof.operations, storageextension.SetOperation(key, valueBytes))
}

return bof
Expand All @@ -73,7 +73,7 @@ func (bof *batchStruct) set(key string, value interface{}, marshal func(interfac
// get adds a Get operation to the batch. After executing, its result will be available through getResult
func (bof *batchStruct) get(keys ...string) *batchStruct {
for _, key := range keys {
op := storage.GetOperation(key)
op := storageextension.GetOperation(key)
bof.getOperations[key] = op
bof.operations = append(bof.operations, op)
}
Expand All @@ -84,7 +84,7 @@ func (bof *batchStruct) get(keys ...string) *batchStruct {
// delete adds a Delete operation to the batch
func (bof *batchStruct) delete(keys ...string) *batchStruct {
for _, key := range keys {
bof.operations = append(bof.operations, storage.DeleteOperation(key))
bof.operations = append(bof.operations, storageextension.DeleteOperation(key))
}

return bof
Expand Down
24 changes: 12 additions & 12 deletions exporter/exporterhelper/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/storage"
"go.opentelemetry.io/collector/extension/experimental/storageextension"
)

func createStorageExtension(_ string) storage.Extension {
func createStorageExtension(_ string) storageextension.Extension {
// After having storage moved to core, we could leverage storagetest.NewTestExtension(nil, path)
return newMockStorageExtension()
}

func createTestClient(extension storage.Extension) storage.Client {
func createTestClient(extension storageextension.Extension) storageextension.Client {
client, err := extension.GetClient(context.Background(), component.KindReceiver, config.ComponentID{}, "")
if err != nil {
panic(err)
}
return client
}

func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
func createTestPersistentStorageWithLoggingAndCapacity(client storageextension.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newTraceRequestUnmarshalerFunc(nopTracePusher()))
}

func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage {
func createTestPersistentStorage(client storageextension.Client) *persistentContiguousStorage {
logger, _ := zap.NewDevelopment()
return createTestPersistentStorageWithLoggingAndCapacity(client, logger, 1000)
}
Expand Down Expand Up @@ -332,15 +332,15 @@ func (m mockStorageExtension) Shutdown(_ context.Context) error {
return nil
}

func (m mockStorageExtension) GetClient(ctx context.Context, kind component.Kind, id config.ComponentID, s string) (storage.Client, error) {
func (m mockStorageExtension) GetClient(ctx context.Context, kind component.Kind, id config.ComponentID, s string) (storageextension.Client, error) {
return newMockStorageClient(), nil
}

func newMockStorageExtension() storage.Extension {
func newMockStorageExtension() storageextension.Extension {
return &mockStorageExtension{}
}

func newMockStorageClient() storage.Client {
func newMockStorageClient() storageextension.Client {
return &mockStorageClient{
st: map[string][]byte{},
}
Expand Down Expand Up @@ -383,17 +383,17 @@ func (m *mockStorageClient) Close(_ context.Context) error {
return nil
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
func (m *mockStorageClient) Batch(_ context.Context, ops ...storageextension.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

for _, op := range ops {
switch op.Type {
case storage.Get:
case storageextension.Get:
op.Value = m.st[op.Key]
case storage.Set:
case storageextension.Set:
m.st[op.Key] = op.Value
case storage.Delete:
case storageextension.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/queued_retry_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/extension/storage"
"go.opentelemetry.io/collector/extension/experimental/storageextension"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

Expand Down Expand Up @@ -101,10 +101,10 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
return qrs
}

func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
var storageExtension storage.Extension
func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storageextension.Client, error) {
var storageExtension storageextension.Extension
for _, ext := range host.GetExtensions() {
if se, ok := ext.(storage.Extension); ok {
if se, ok := ext.(storageextension.Extension); ok {
if storageExtension != nil {
return nil, errMultipleStorageClients
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

A storage extension persists state beyond the collector process. Other components can request a storage client from the storage extension and use it to manage state.

The `storage.Extension` interface extends `component.Extension` by adding the following method:
The `storageextension.Extension` interface extends `component.Extension` by adding the following method:
```
GetClient(context.Context, component.Kind, config.ComponentID, string) (Client, error)
```

The `storage.Client` interface contains the following methods:
The `storageextension.Client` interface contains the following methods:
```
Get(context.Context, string) ([]byte, error)
Set(context.Context, string, []byte) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

// Package stanzareceiver implements a receiver that can be used by the
// Opentelemetry collector to receive logs using the stanza log agent
package storage
package storageextension
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package storage
package storageextension

import "context"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package storage
package storageextension

import (
"context"
Expand Down