Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e8b49f8
storage: add backend interface and related types
boxofrad Mar 6, 2023
ae28bc8
storage: add conformance test suite
boxofrad Mar 6, 2023
531e995
storage: implement in-memory backend
boxofrad Mar 6, 2023
74eef9a
storage: better error messages
boxofrad Mar 6, 2023
2731ba8
storage: owner references should be anchored to a specific uid
boxofrad Mar 13, 2023
c25a7ec
Remove unused test function
boxofrad Mar 13, 2023
d5733f9
storage: clarify List docs
boxofrad Mar 14, 2023
56c4c3a
storage: fix potential out-of-order events
boxofrad Mar 14, 2023
033b612
storage: clarify WatchList consistency model
boxofrad Mar 15, 2023
f54bcb8
storage: s/Check-And-Set/Compare-And-Swap/
boxofrad Mar 15, 2023
0167a5b
storage: move event construction into publishEvent
boxofrad Mar 15, 2023
048712d
storage: document Read-Modify-Write patterns
boxofrad Mar 15, 2023
fbbbec9
storage: add clarifying comment about resource creation
boxofrad Mar 15, 2023
e17b50b
storage: clarify OwnerReferences consistency
boxofrad Mar 16, 2023
f441187
storage: separate ErrConflict into two errors
boxofrad Mar 17, 2023
84ff540
storage: make backends responsible for managing the version
boxofrad Mar 17, 2023
d7ac1c6
storage: make consistency an argument to Read
boxofrad Mar 17, 2023
ba94207
storage: add consistency parameter to List
boxofrad Mar 17, 2023
6e39fd5
storage: more correct consistency documentation
boxofrad Mar 17, 2023
4580e51
storage: fix integer alignment in inmem backend
boxofrad Mar 17, 2023
0748073
storage: support eventual consistency in conformance tests
boxofrad Mar 20, 2023
0a49eff
storage: correct eventLock comment
boxofrad Mar 20, 2023
8c75333
storage: rearrange inmem store files
boxofrad Mar 20, 2023
8bc4a80
storage: fix bug where watches could emit duplicate events
boxofrad Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
675 changes: 675 additions & 0 deletions internal/storage/conformance/conformance.go

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions internal/storage/inmem/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package inmem

import (
"context"
"strconv"
"sync/atomic"

"google.golang.org/protobuf/proto"

"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)

// NewBackend returns a purely in-memory storage backend. It's suitable for
// testing and development mode, but should NOT be used in production as it
// has no support for durable persistence, so all of your data will be lost
// when the process restarts or crashes.
//
// You must call Run before using the backend.
func NewBackend() (*Backend, error) {
store, err := NewStore()
if err != nil {
return nil, err
}
return &Backend{store: store}, nil
}

// Backend is a purely in-memory storage backend implementation.
type Backend struct {
vsn uint64

store *Store
}

// Run until the given context is canceled. This method blocks, so should be
// called in a goroutine.
func (b *Backend) Run(ctx context.Context) { b.store.Run(ctx) }

// Read implements the storage.Backend interface.
func (b *Backend) Read(_ context.Context, _ storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
return b.store.Read(id)
}

// WriteCAS implements the storage.Backend interface.
func (b *Backend) WriteCAS(_ context.Context, res *pbresource.Resource) (*pbresource.Resource, error) {
stored := proto.Clone(res).(*pbresource.Resource)
stored.Version = strconv.Itoa(int(atomic.AddUint64(&b.vsn, 1)))

if err := b.store.WriteCAS(stored, res.Version); err != nil {
return nil, err
}
return stored, nil
}

// DeleteCAS implements the storage.Backend interface.
func (b *Backend) DeleteCAS(_ context.Context, id *pbresource.ID, version string) error {
return b.store.DeleteCAS(id, version)
}

// List implements the storage.Backend interface.
func (b *Backend) List(_ context.Context, _ storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
return b.store.List(resType, tenancy, namePrefix)
}

// WatchList implements the storage.Backend interface.
func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) {
return b.store.WatchList(resType, tenancy, namePrefix)
}

// OwnerReferences implements the storage.Backend interface.
func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) {
return b.store.OwnerReferences(id)
}
28 changes: 28 additions & 0 deletions internal/storage/inmem/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package inmem_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/conformance"
"github.com/hashicorp/consul/internal/storage/inmem"
)

func TestBackend_Conformance(t *testing.T) {
conformance.Test(t, conformance.TestOptions{
NewBackend: func(t *testing.T) storage.Backend {
backend, err := inmem.NewBackend()
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go backend.Run(ctx)

return backend
},
SupportsStronglyConsistentList: true,
})
}
33 changes: 33 additions & 0 deletions internal/storage/inmem/event_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package inmem

import "github.com/hashicorp/go-memdb"

type meta struct {
Key string
Value any
}

func incrementEventIndex(tx *memdb.Txn) (uint64, error) {
idx, err := currentEventIndex(tx)
if err != nil {
return 0, err
}

idx++
if err := tx.Insert(tableNameMetadata, meta{Key: metaKeyEventIndex, Value: idx}); err != nil {
return 0, nil
}
return idx, nil
}

func currentEventIndex(tx *memdb.Txn) (uint64, error) {
v, err := tx.First(tableNameMetadata, indexNameID, metaKeyEventIndex)
if err != nil {
return 0, err
}
if v == nil {
// 0 and 1 index are reserved for special use in the stream package.
return 2, nil
}
return v.(meta).Value.(uint64), nil
}
247 changes: 247 additions & 0 deletions internal/storage/inmem/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package inmem

import (
"bytes"
"fmt"
"strings"

"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)

const (
tableNameMetadata = "metadata"
tableNameResources = "resources"

indexNameID = "id"
indexNameOwner = "owner"

metaKeyEventIndex = "index"
)

func newDB() (*memdb.MemDB, error) {
return memdb.NewMemDB(&memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
tableNameMetadata: {
Name: tableNameMetadata,
Indexes: map[string]*memdb.IndexSchema{
indexNameID: {
Name: indexNameID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Key"},
},
},
},
tableNameResources: {
Name: tableNameResources,
Indexes: map[string]*memdb.IndexSchema{
indexNameID: {
Name: indexNameID,
AllowMissing: false,
Unique: true,
Indexer: idIndexer{},
},
indexNameOwner: {
Name: indexNameOwner,
AllowMissing: true,
Unique: false,
Indexer: ownerIndexer{},
},
},
},
},
})
}

// indexSeparator delimits the segments of our radix tree keys.
const indexSeparator = "\x00"

// idIndexer implements the memdb.Indexer, memdb.SingleIndexer and
// memdb.PrefixIndexer interfaces. It is used for indexing resources
// by their IDs.
type idIndexer struct{}

// FromArgs constructs a radix tree key from an ID for lookup.
func (i idIndexer) FromArgs(args ...any) ([]byte, error) {
if l := len(args); l != 1 {
return nil, fmt.Errorf("expected 1 arg, got: %d", l)
}
id, ok := args[0].(*pbresource.ID)
if !ok {
return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0])
}
return indexFromID(id, false), nil
}

// FromObject constructs a radix tree key from a Resource at write-time, or an
// ID at delete-time.
func (i idIndexer) FromObject(raw any) (bool, []byte, error) {
switch t := raw.(type) {
case *pbresource.ID:
return true, indexFromID(t, false), nil
case *pbresource.Resource:
return true, indexFromID(t.Id, false), nil
}
return false, nil, fmt.Errorf("expected *pbresource.Resource or *pbresource.ID, got: %T", raw)
}

// PrefixFromArgs constructs a radix tree key prefix from a query for listing.
func (i idIndexer) PrefixFromArgs(args ...any) ([]byte, error) {
if l := len(args); l != 1 {
return nil, fmt.Errorf("expected 1 arg, got: %d", l)
}

q, ok := args[0].(query)
if !ok {
return nil, fmt.Errorf("expected query, got: %T", args[0])
}
return q.indexPrefix(), nil
}

// ownerIndexer implements the memdb.Indexer and memdb.SingleIndexer interfaces.
// It is used for indexing resources by their owners.
type ownerIndexer struct{}

// FromArgs constructs a radix tree key from an ID for lookup.
func (i ownerIndexer) FromArgs(args ...any) ([]byte, error) {
if l := len(args); l != 1 {
return nil, fmt.Errorf("expected 1 arg, got: %d", l)
}
id, ok := args[0].(*pbresource.ID)
if !ok {
return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0])
}
return indexFromID(id, true), nil
}

// FromObject constructs a radix key tree from a Resource at write-time.
func (i ownerIndexer) FromObject(raw any) (bool, []byte, error) {
res, ok := raw.(*pbresource.Resource)
if !ok {
return false, nil, fmt.Errorf("expected *pbresource.Resource, got: %T", raw)
}
if res.Owner == nil {
return false, nil, nil
}
return true, indexFromID(res.Owner, true), nil
}

func indexFromType(t storage.UnversionedType) []byte {
var b indexBuilder
b.String(t.Group)
b.String(t.Kind)
return b.Bytes()
}

func indexFromTenancy(t *pbresource.Tenancy) []byte {
var b indexBuilder
b.String(t.Partition)
b.String(t.PeerName)
b.String(t.Namespace)
return b.Bytes()
}

func indexFromID(id *pbresource.ID, includeUid bool) []byte {
var b indexBuilder
b.Raw(indexFromType(storage.UnversionedTypeFrom(id.Type)))
b.Raw(indexFromTenancy(id.Tenancy))
b.String(id.Name)
if includeUid {
b.String(id.Uid)
}
return b.Bytes()
}

type indexBuilder bytes.Buffer

func (i *indexBuilder) Raw(v []byte) {
(*bytes.Buffer)(i).Write(v)
}

func (i *indexBuilder) String(s string) {
(*bytes.Buffer)(i).WriteString(s)
(*bytes.Buffer)(i).WriteString(indexSeparator)
}

func (i *indexBuilder) Bytes() []byte {
return (*bytes.Buffer)(i).Bytes()
}

type query struct {
resourceType storage.UnversionedType
tenancy *pbresource.Tenancy
namePrefix string
}

// indexPrefix is called by idIndexer.PrefixFromArgs to construct a radix tree
// key prefix for list queries.
//
// Our radix tree keys are structured like so:
//
// <type><partition><peer><namespace><name>
//
// Where each segment is followed by a NULL terminator.
//
// In order to handle wildcard queries, we return a prefix up to the wildcarded
// field. For example:
//
// Query: type={mesh,v1,service}, partition=default, peer=*, namespace=default
// Prefix: mesh[NULL]v1[NULL]service[NULL]default[NULL]
//
// Which means that we must manually apply filters after the wildcard (i.e.
// namespace in the above example) in the matches method.
func (q query) indexPrefix() []byte {
var b indexBuilder
b.Raw(indexFromType(q.resourceType))

if v := q.tenancy.Partition; v == storage.Wildcard {
return b.Bytes()
} else {
b.String(v)
}

if v := q.tenancy.PeerName; v == storage.Wildcard {
return b.Bytes()
} else {
b.String(v)
}

if v := q.tenancy.Namespace; v == storage.Wildcard {
return b.Bytes()
} else {
b.String(v)
}

if q.namePrefix != "" {
b.Raw([]byte(q.namePrefix))
}

return b.Bytes()
}

// matches applies filters that couldn't be applied by just doing a radix tree
// prefix scan, because an earlier segment of the key prefix was wildcarded.
//
// See docs on query.indexPrefix for an example.
func (q query) matches(res *pbresource.Resource) bool {
if q.tenancy.Partition != storage.Wildcard && res.Id.Tenancy.Partition != q.tenancy.Partition {
return false
}

if q.tenancy.PeerName != storage.Wildcard && res.Id.Tenancy.PeerName != q.tenancy.PeerName {
return false
}

if q.tenancy.Namespace != storage.Wildcard && res.Id.Tenancy.Namespace != q.tenancy.Namespace {
return false
}

if len(q.namePrefix) != 0 && !strings.HasPrefix(res.Id.Name, q.namePrefix) {
return false
}

return true
}
Loading