From e8b49f899bfc1b9d1d018adf37a70fe6fbd2c350 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 6 Mar 2023 12:09:42 +0000 Subject: [PATCH 01/24] storage: add backend interface and related types --- internal/storage/storage.go | 220 ++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 internal/storage/storage.go diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 00000000000..7907bb1a7e1 --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,220 @@ +package storage + +import ( + "context" + "errors" + "fmt" + + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Wildcard can be given as Tenancy fields in List and Watch calls, to enumerate +// resources across multiple partitions, peers, namespaces, etc. +const Wildcard = "*" + +var ( + // ErrNotFound indicates that the resource could not be found. + ErrNotFound = errors.New("resource not found") + + // ErrConflict indicates that the attempted write failed because of a version + // or UID mismatch. + ErrConflict = errors.New("CAS operation failed with conflict") + + // ErrInconsistent indicates that the attempted write or consistent read could + // not be achieved because of a consistency or availability issue (e.g. loss of + // quorum, or when interacting with a Raft follower). + ErrInconsistent = errors.New("cannot satisfy required consistency") +) + +// Backend provides the low-level storage substrate for resources. It can be +// implemented using internal (i.e. Raft+MemDB) or external (e.g. DynamoDB) +// storage systems. +// +// Refer to the method comments for details of the behaviors and invariants +// provided, which are also verified by the conformance test suite in the +// internal/storage/conformance package. +// +// Cross-cutting concerns: +// +// # UIDs +// +// Users identify resources with a name of their choice (e.g. service "billing") +// but internally, we add our own identifier in the Uid field to disambiguate +// references when resources are deleted and re-created with the same name. +// +// # GroupVersion +// +// In order to support automatic translation between schema versions, we only +// store a single version of a resource, and treat types with the same Group +// and Kind, but different GroupVersions, as equivalent. +type Backend interface { + // Read a resource using its ID. + // + // # UIDs + // + // If id.Uid is empty, Read will ignore it and return whatever resource is + // stored with the given name. This is the desired behavior for user-initiated + // reads. + // + // If id.Uid is non-empty, Read will only return a resource if its Uid matches, + // otherwise it'll return ErrNotFound. This is the desired behaviour for reads + // initiated by controllers, which tend to operate on a specific lifetime of a + // resource. + // + // See Backend docs for more details. + // + // # GroupVersion + // + // If id.Type.GroupVersion doesn't match what is stored, Read will return a + // GroupVersionMismatchError which contains a pointer to the stored resource. + // + // See Backend docs for more details. + // + // # Consistency + // + // Read makes no guarantees about consistency, and may return stale results. + // For stronger guarantees, use ReadConsistent. + Read(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) + + // ReadConsistent provides the same functionality as Read, but guarantees + // single-resource sequential consistency, typically by bypassing any caches + // and proxying the request directly to the underlying storage system. + // + // If a consistent read cannot be achieved (e.g. when interacting with a Raft + // follower, or quorum is lost) ErrInconsistent will be returned. + // + // Use ReadConsistent sparingly, and prefer Read when possible. + ReadConsistent(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) + + // WriteCAS performs an atomic CAS (Check-And-Set) write of a resource based + // on its version. The given version will be compared to what is stored, and + // if it does not match, ErrConflict will be returned. To create new resources, + // pass an empty version string. + // + // If a write cannot be performed because of a consistency or availability + // issue (e.g. when interacting with a Raft follower, or when quorum is lost) + // ErrInconsistent will be returned. + // + // # UIDs + // + // UIDs are immutable, so if the given resource's Uid field doesn't match what + // is stored, ErrConflict will be returned. + // + // See Backend docs for more details. + // + // # GroupVersion + // + // Write does not validate the GroupVersion and allows you to overwrite a + // resource stored in an older form with a newer, and vice versa. + // + // See Backend docs for more details. + WriteCAS(ctx context.Context, res *pbresource.Resource, version string) (*pbresource.Resource, error) + + // DeleteCAS performs an atomic CAS (Check-And-Set) deletion of a resource + // based on its version. The given version will be compared to what is stored, + // and if it does not match, ErrConflict will be returned. + // + // If the resource does not exist (i.e. has already been deleted) no error will + // be returned. + // + // If a deletion cannot be performed because of a consistency or availability + // issue (e.g. when interacting with a Raft follower, or when quorum is lost) + // ErrInconsistent will be returned. + // + // # UIDs + // + // If the given id's Uid does not match what is stored, the deletion will be a + // no-op (i.e. it is considered to be a different resource). + // + // See Backend docs for more details. + // + // # GroupVersion + // + // Delete does not check or refer to the GroupVersion. Resources of the same + // Group and Kind are considered equivalent, so requests to delete a resource + // using a new GroupVersion will delete a resource even if it's stored with an + // old GroupVersion. + // + // See Backend docs for more details. + DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error + + // List resources of the given type, tenancy, and optionally matching the given + // name prefix. + // + // # Tenancy Wildcard + // + // In order to list resources across multiple tenancy units (e.g. partitions) + // pass the Wildcard sentinel value in tenancy fields. + // + // # GroupVersion + // + // The resType argument contains only the Group and Kind, to reflect the fact + // that resources may be stored in a mix of old and new forms. As such, it's + // the caller's responsibility to check the resource's GroupVersion and + // translate or filter accordingly. + // + // # Consistency + // + // List makes no guarantees about consistency, and may return stale results. + List(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) + + // WatchList watches resources of the given type, tenancy, and optionally + // matching the given name prefix. Upsert events for the current state of the + // world (i.e. existing resources that match the given filters) will be emitted + // immediately, and will be followed by delta events whenever resources are + // written or deleted. + // + // See List docs for details about Tenancy Wildcard, GroupVersion, and + // Consistency. + WatchList(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (Watch, error) + + // OwnerReferences returns the IDs of resources owned by the resource with the + // given ID. It is typically used to implement cascading deletion. + // + // # Consistency + // + // OwnerReferences makes no guarantees about consistency, and may return stale + // results. + OwnerReferences(ctx context.Context, id *pbresource.ID) ([]*pbresource.ID, error) +} + +// Watch represents a watch on a given set of resources. Call Next to get the +// next event (i.e. upsert or deletion). +type Watch interface { + // Next returns the next event (i.e. upsert or deletion) + Next(ctx context.Context) (*pbresource.WatchEvent, error) +} + +// UnversionedType represents a pbresource.Type as it is stored without the +// GroupVersion. +type UnversionedType struct { + Group string + Kind string +} + +// UnversionedTypeFrom creates an UnversionedType from the given *pbresource.Type. +func UnversionedTypeFrom(t *pbresource.Type) UnversionedType { + return UnversionedType{ + Group: t.Group, + Kind: t.Kind, + } +} + +// GroupVersionMismatchError is returned when a resource is stored as a type +// with a different GroupVersion than was requested. +type GroupVersionMismatchError struct { + // RequestedType is the type that was requested. + RequestedType *pbresource.Type + + // Stored is the resource as it is stored. + Stored *pbresource.Resource +} + +// Error implements the error interface. +func (e GroupVersionMismatchError) Error() string { + return fmt.Sprintf( + "resource was requested with GroupVersion=%q, but stored with GroupVersion=%q", + e.RequestedType.GroupVersion, + e.Stored.Id.Type.GroupVersion, + ) +} From ae28bc862595f6ae9648953fc251e60fc90875c8 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 6 Mar 2023 12:10:11 +0000 Subject: [PATCH 02/24] storage: add conformance test suite --- internal/storage/conformance/conformance.go | 611 ++++++++++++++++++++ 1 file changed, 611 insertions(+) create mode 100644 internal/storage/conformance/conformance.go diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go new file mode 100644 index 00000000000..e1c23150c6c --- /dev/null +++ b/internal/storage/conformance/conformance.go @@ -0,0 +1,611 @@ +package conformance + +import ( + "context" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +type TestOptions struct { + // NewBackend will be called to construct a storage.Backend to run the tests + // against. + NewBackend func(t *testing.T) storage.Backend +} + +// Test runs a suite of tests against a storage.Backend implementation to check +// it correctly implements our required behaviours. +// +// Note: it currently checks for stronger consistency than we actually need. We +// will need to handle eventual consistency when we implement the Raft backend. +func Test(t *testing.T, opts TestOptions) { + require.NotNil(t, opts.NewBackend, "NewBackend method is required") + + t.Run("Read", func(t *testing.T) { testRead(t, opts) }) + t.Run("CAS Write", func(t *testing.T) { testCASWrite(t, opts) }) + t.Run("CAS Delete", func(t *testing.T) { testCASDelete(t, opts) }) + t.Run("OwnerReferences", func(t *testing.T) { testOwnerReferences(t, opts) }) + + testListWatch(t, opts) +} + +func testRead(t *testing.T, opts TestOptions) { + ctx := testContext(t) + backend := opts.NewBackend(t) + + res := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + Version: "1", + } + _, err := backend.WriteCAS(ctx, res, "") + require.NoError(t, err) + + t.Run("simple", func(t *testing.T) { + output, err := backend.Read(ctx, res.Id) + require.NoError(t, err) + require.Equal(t, res, output) + }) + + t.Run("no uid", func(t *testing.T) { + id := clone(res.Id) + id.Uid = "" + + output, err := backend.Read(ctx, id) + require.NoError(t, err) + require.Equal(t, res, output) + }) + + t.Run("different id", func(t *testing.T) { + id := clone(res.Id) + id.Name = "different" + + _, err := backend.Read(ctx, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + + t.Run("different uid", func(t *testing.T) { + id := clone(res.Id) + id.Uid = "b" + + _, err := backend.Read(ctx, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + + t.Run("different GroupVersion", func(t *testing.T) { + id := clone(res.Id) + id.Type = typeAv2 + + _, err := backend.Read(ctx, id) + require.Error(t, err) + + var e storage.GroupVersionMismatchError + if errors.As(err, &e) { + require.Equal(t, id.Type, e.RequestedType) + require.Equal(t, res, e.Stored) + } else { + t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err) + } + }) +} + +func testCASWrite(t *testing.T, opts TestOptions) { + t.Run("version-based CAS", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + v1 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + Version: "1", + } + + _, err := backend.WriteCAS(ctx, v1, "some-version") + require.ErrorIs(t, err, storage.ErrConflict) + + _, err = backend.WriteCAS(ctx, v1, "") + require.NoError(t, err) + + v2 := clone(v1) + v2.Version = "2" + + _, err = backend.WriteCAS(ctx, v2, v1.Version) + require.NoError(t, err) + + v3 := clone(v2) + v3.Version = "3" + + _, err = backend.WriteCAS(ctx, v3, "") + require.ErrorIs(t, err, storage.ErrConflict) + + _, err = backend.WriteCAS(ctx, v3, v1.Version) + require.ErrorIs(t, err, storage.ErrConflict) + }) + + t.Run("uid immutability", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + v1 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + Version: "1", + } + _, err := backend.WriteCAS(ctx, v1, "") + require.NoError(t, err) + + // Uid cannot change. + v2 := clone(v1) + v2.Version = "2" + + v2.Id.Uid = "" + _, err = backend.WriteCAS(ctx, v2, v1.Version) + require.Error(t, err) + + v2.Id.Uid = "b" + _, err = backend.WriteCAS(ctx, v2, v1.Version) + require.ErrorIs(t, err, storage.ErrConflict) + + v2.Id.Uid = v1.Id.Uid + _, err = backend.WriteCAS(ctx, v2, v1.Version) + require.NoError(t, err) + + // Uid can change after original resource is deleted. + require.NoError(t, backend.DeleteCAS(ctx, v2.Id, v2.Version)) + + v3 := clone(v2) + v3.Version = "3" + v3.Id.Uid = "b" + + _, err = backend.WriteCAS(ctx, v2, "") + require.NoError(t, err) + }) +} + +func testCASDelete(t *testing.T, opts TestOptions) { + t.Run("version-based CAS", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + res := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + Version: "1", + } + _, err := backend.WriteCAS(ctx, res, "") + require.NoError(t, err) + + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, ""), storage.ErrConflict) + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "2"), storage.ErrConflict) + + require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) + + _, err = backend.Read(ctx, res.Id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + + t.Run("uid must match", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + res := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + Version: "1", + } + _, err := backend.WriteCAS(ctx, res, "") + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "b" + require.NoError(t, backend.DeleteCAS(ctx, id, res.Version)) + + _, err = backend.Read(ctx, res.Id) + require.NoError(t, err) + }) +} + +func testListWatch(t *testing.T, opts TestOptions) { + testCases := map[string]struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy + namePrefix string + results []*pbresource.Resource + }{ + "simple #1": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyDefault, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + }, + }, + "simple #2": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyOther, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[3], + }, + }, + "fixed tenancy, name prefix": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: tenancyDefault, + namePrefix: "a", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + }, + }, + "wildcard tenancy": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[3], + seedData[5], + seedData[6], + }, + }, + "fixed partition, wildcard peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: "default", + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[5], + seedData[6], + }, + }, + "wildcard partition, fixed peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: "local", + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[3], + seedData[5], + }, + }, + "wildcard partition, wildcard peer, fixed namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: "default", + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[6], + }, + }, + "fixed partition, fixed peer, wildcard namespace": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: storage.Wildcard, + }, + namePrefix: "", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[2], + seedData[5], + }, + }, + "wildcard tenancy, name prefix": { + resourceType: storage.UnversionedTypeFrom(typeAv1), + tenancy: &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + }, + namePrefix: "a", + results: []*pbresource.Resource{ + seedData[0], + seedData[1], + seedData[3], + seedData[5], + seedData[6], + }, + }, + } + + t.Run("List", func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r, "") + require.NoError(t, err) + } + + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + res, err := backend.List(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + require.ElementsMatch(t, res, tc.results) + }) + } + }) + + t.Run("WatchList", func(t *testing.T) { + for desc, tc := range testCases { + t.Run(fmt.Sprintf("%s - initial snapshot", desc), func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + // Write the seed data before the watch has been established. + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r, "") + require.NoError(t, err) + } + + watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + + for i := 0; i < len(tc.results); i++ { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) + require.Containsf(t, tc.results, event.Resource, "resource not in expected results: %s", event.Resource.Id) + } + }) + + t.Run(fmt.Sprintf("%s - following events", desc), func(t *testing.T) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + + // Write the seed data after the watch has been established. + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r, "") + require.NoError(t, err) + } + + for i := 0; i < len(tc.results); i++ { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) + require.Containsf(t, tc.results, event.Resource, "resource not in expected results: %s", event.Resource.Id) + } + + // Delete a random resource to check we get an event. + del := tc.results[rand.Intn(len(tc.results))] + require.NoError(t, backend.DeleteCAS(ctx, del.Id, del.Version)) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + t.Cleanup(cancel) + + event, err := watch.Next(ctx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, event.Operation) + require.Equal(t, del, event.Resource) + }) + } + }) +} + +func testWatch(t *testing.T, opts TestOptions) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + r1 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + Version: "1", + } + _, err := backend.WriteCAS(ctx, r1, "") + require.NoError(t, err) + + watch, err := backend.WatchList(ctx, storage.UnversionedTypeFrom(typeAv1), tenancyDefault, "") + require.NoError(t, err) + + timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + t.Cleanup(cancel) + event, err := watch.Next(timeoutCtx) + require.NoError(t, err) + + require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT.String(), event.Operation.String()) + require.Equal(t, r1, event.Resource) +} + +func testOwnerReferences(t *testing.T, opts TestOptions) { + backend := opts.NewBackend(t) + ctx := testContext(t) + + owner := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "owner", + Uid: "a", + }, + Version: "1", + } + _, err := backend.WriteCAS(ctx, owner, "") + require.NoError(t, err) + + r1 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeB, + Tenancy: tenancyDefault, + Name: "r1", + Uid: "a", + }, + Owner: owner.Id, + Version: "1", + } + _, err = backend.WriteCAS(ctx, r1, "") + require.NoError(t, err) + + r2 := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv2, + Tenancy: tenancyDefault, + Name: "r2", + Uid: "a", + }, + Owner: owner.Id, + Version: "1", + } + _, err = backend.WriteCAS(ctx, r2, "") + require.NoError(t, err) + + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + + t.Run("deleting the owner doesn't remove the references", func(t *testing.T) { + require.NoError(t, backend.DeleteCAS(ctx, owner.Id, owner.Version)) + + refs, err = backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) + + t.Run("deleting the owned resource removes its reference", func(t *testing.T) { + require.NoError(t, backend.DeleteCAS(ctx, r2.Id, r2.Version)) + + refs, err = backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id}) + }) +} + +var ( + typeAv1 = &pbresource.Type{ + Group: "test", + GroupVersion: "v1", + Kind: "a", + } + typeAv2 = &pbresource.Type{ + Group: "test", + GroupVersion: "v2", + Kind: "a", + } + typeB = &pbresource.Type{ + Group: "test", + GroupVersion: "v1", + Kind: "b", + } + tenancyDefault = &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: "default", + } + + tenancyDefaultOtherNamespace = &pbresource.Tenancy{ + Partition: "default", + PeerName: "local", + Namespace: "other", + } + tenancyDefaultOtherPeer = &pbresource.Tenancy{ + Partition: "default", + PeerName: "remote", + Namespace: "default", + } + tenancyOther = &pbresource.Tenancy{ + Partition: "billing", + PeerName: "local", + Namespace: "payments", + } + + seedData = []*pbresource.Resource{ + resource(typeAv1, tenancyDefault, "admin"), // 0 + resource(typeAv1, tenancyDefault, "api"), // 1 + resource(typeAv2, tenancyDefault, "web"), // 2 + resource(typeAv1, tenancyOther, "api"), // 3 + resource(typeB, tenancyDefault, "admin"), // 4 + resource(typeAv1, tenancyDefaultOtherNamespace, "autoscaler"), // 5 + resource(typeAv1, tenancyDefaultOtherPeer, "amplifier"), // 6 + } +) + +func resource(typ *pbresource.Type, ten *pbresource.Tenancy, name string) *pbresource.Resource { + return &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typ, + Tenancy: ten, + Name: name, + Uid: "a", + }, + Version: "1", + } +} + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} + +func clone[T proto.Message](v T) T { return proto.Clone(v).(T) } From 531e9956161d8141aecf7c46636bbc0e2478dff6 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 6 Mar 2023 12:10:28 +0000 Subject: [PATCH 03/24] storage: implement in-memory backend --- internal/storage/inmem/backend.go | 67 +++++ internal/storage/inmem/backend_test.go | 27 ++ internal/storage/inmem/query.go | 187 ++++++++++++++ internal/storage/inmem/store.go | 328 +++++++++++++++++++++++++ internal/storage/inmem/watch.go | 188 ++++++++++++++ 5 files changed, 797 insertions(+) create mode 100644 internal/storage/inmem/backend.go create mode 100644 internal/storage/inmem/backend_test.go create mode 100644 internal/storage/inmem/query.go create mode 100644 internal/storage/inmem/store.go create mode 100644 internal/storage/inmem/watch.go diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go new file mode 100644 index 00000000000..8b5cb2c9716 --- /dev/null +++ b/internal/storage/inmem/backend.go @@ -0,0 +1,67 @@ +package inmem + +import ( + "context" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// NewBackend returns a purely in-memory storage backend. It's suitable for +// testing and development mode, but should NOT be used in production as it +// has no support for durable persistence, so all of your data will be lost +// when the process restarts or crashes. +// +// You must call Run before using the backend. +func NewBackend() (*Backend, error) { + store, err := NewStore() + if err != nil { + return nil, err + } + return &Backend{store}, nil +} + +// Backend is a purely in-memory storage backend implementation. +type Backend struct{ store *Store } + +// Run until the given context is canceled. This method blocks, so should be +// called in a goroutine. +func (b *Backend) Run(ctx context.Context) { b.store.Run(ctx) } + +// Read implements the storage.Backend interface. +func (b *Backend) Read(_ context.Context, id *pbresource.ID) (*pbresource.Resource, error) { + return b.store.Read(id) +} + +// ReadConsistent implements the storage.Backend interface. +func (b *Backend) ReadConsistent(_ context.Context, id *pbresource.ID) (*pbresource.Resource, error) { + return b.store.Read(id) +} + +// WriteCAS implements the storage.Backend interface. +func (b *Backend) WriteCAS(_ context.Context, res *pbresource.Resource, version string) (*pbresource.Resource, error) { + if err := b.store.WriteCAS(res, version); err != nil { + return nil, err + } + return res, nil +} + +// DeleteCAS implements the storage.Backend interface. +func (b *Backend) DeleteCAS(_ context.Context, id *pbresource.ID, version string) error { + return b.store.DeleteCAS(id, version) +} + +// List implements the storage.Backend interface. +func (b *Backend) List(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + return b.store.List(resType, tenancy, namePrefix) +} + +// WatchList implements the storage.Backend interface. +func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) { + return b.store.WatchList(resType, tenancy, namePrefix) +} + +// OwnerReferences implements the storage.Backend interface. +func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) { + return b.store.OwnerReferences(id) +} diff --git a/internal/storage/inmem/backend_test.go b/internal/storage/inmem/backend_test.go new file mode 100644 index 00000000000..1620ec29953 --- /dev/null +++ b/internal/storage/inmem/backend_test.go @@ -0,0 +1,27 @@ +package inmem_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/internal/storage/conformance" + "github.com/hashicorp/consul/internal/storage/inmem" +) + +func TestBackend_Conformance(t *testing.T) { + conformance.Test(t, conformance.TestOptions{ + NewBackend: func(t *testing.T) storage.Backend { + backend, err := inmem.NewBackend() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go backend.Run(ctx) + + return backend + }, + }) +} diff --git a/internal/storage/inmem/query.go b/internal/storage/inmem/query.go new file mode 100644 index 00000000000..ea5a23a830c --- /dev/null +++ b/internal/storage/inmem/query.go @@ -0,0 +1,187 @@ +package inmem + +import ( + "bytes" + "fmt" + "strings" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// indexSeparator delimits the segments of our radix tree keys. +const indexSeparator = "\x00" + +// readIndexer implements the memdb.Indexer interface (see FromArgs). +type readIndexer struct{} + +// FromArgs constructs a radix tree key from an ID for lookup. +func (i readIndexer) FromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + id, ok := args[0].(*pbresource.ID) + if !ok { + return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) + } + return indexFromID(id), nil +} + +// idIndexer implements the memdb.SingleIndexer and memdb.PrefixIndexer +// interfaces. It is used for indexing resources by their IDs. +type idIndexer struct{ readIndexer } + +// FromObject constructs a radix tree key from a Resource at write-time, or an +// ID at delete-time. +func (i idIndexer) FromObject(raw any) (bool, []byte, error) { + switch t := raw.(type) { + case *pbresource.ID: + return true, indexFromID(t), nil + case *pbresource.Resource: + return true, indexFromID(t.Id), nil + } + return false, nil, fmt.Errorf("expected *pbresource.Resource or *pbresource.ID, got: %T", raw) +} + +// PrefixFromArgs constructs a radix tree key prefix from a query for listing. +func (i idIndexer) PrefixFromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + + q, ok := args[0].(query) + if !ok { + return nil, fmt.Errorf("expected query, got: %T", args[0]) + } + return q.indexPrefix(), nil +} + +// ownerIndexer implements the memdb.SingleIndexer interface. It is used for +// indexing resources by their owners. +type ownerIndexer struct{ readIndexer } + +// FromObject constructs a radix key tree from a Resource at write-time. +func (i ownerIndexer) FromObject(raw any) (bool, []byte, error) { + res, ok := raw.(*pbresource.Resource) + if !ok { + return false, nil, fmt.Errorf("expected *pbresource.Resource, got: %T", raw) + } + if res.Owner == nil { + return false, nil, nil + } + return true, indexFromID(res.Owner), nil +} + +func indexFromType(t storage.UnversionedType) []byte { + var b indexBuilder + b.String(t.Group) + b.String(t.Kind) + return b.Bytes() +} + +func indexFromTenancy(t *pbresource.Tenancy) []byte { + var b indexBuilder + b.String(t.Partition) + b.String(t.PeerName) + b.String(t.Namespace) + return b.Bytes() +} + +func indexFromID(id *pbresource.ID) []byte { + var b indexBuilder + b.Raw(indexFromType(storage.UnversionedTypeFrom(id.Type))) + b.Raw(indexFromTenancy(id.Tenancy)) + b.String(id.Name) + return b.Bytes() +} + +type indexBuilder bytes.Buffer + +func (i *indexBuilder) Raw(v []byte) { + (*bytes.Buffer)(i).Write(v) +} + +func (i *indexBuilder) String(s string) { + (*bytes.Buffer)(i).WriteString(s) + (*bytes.Buffer)(i).WriteString(indexSeparator) +} + +func (i *indexBuilder) Bytes() []byte { + return (*bytes.Buffer)(i).Bytes() +} + +type query struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy + namePrefix string +} + +// indexPrefix is called by idIndexer.PrefixFromArgs to construct a radix tree +// key prefix for list queries. +// +// Our radix tree keys are structured like so: +// +// +// +// Where each segment is followed by a NULL terminator. +// +// In order to handle wildcard queries, we return a prefix up to the wildcarded +// field. For example: +// +// Query: type={mesh,v1,service}, partition=default, peer=*, namespace=default +// Prefix: mesh[NULL]v1[NULL]service[NULL]default[NULL] +// +// Which means that we must manually apply filters after the wildcard (i.e. +// namespace in the above example) in the matches method. +func (q query) indexPrefix() []byte { + var b indexBuilder + b.Raw(indexFromType(q.resourceType)) + + if v := q.tenancy.Partition; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if v := q.tenancy.PeerName; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if v := q.tenancy.Namespace; v == storage.Wildcard { + return b.Bytes() + } else { + b.String(v) + } + + if q.namePrefix != "" { + b.Raw([]byte(q.namePrefix)) + } + + return b.Bytes() +} + +// matches applies filters that couldn't be applied by just doing a radix tree +// prefix scan, because an earlier segment of the key prefix was wildcarded. +// +// See docs on query.indexPrefix for an example. +func (q query) matches(res *pbresource.Resource) bool { + if q.tenancy.Partition != storage.Wildcard && res.Id.Tenancy.Partition != q.tenancy.Partition { + return false + } + + if q.tenancy.PeerName != storage.Wildcard && res.Id.Tenancy.PeerName != q.tenancy.PeerName { + return false + } + + if q.tenancy.Namespace != storage.Wildcard && res.Id.Tenancy.Namespace != q.tenancy.Namespace { + return false + } + + if len(q.namePrefix) != 0 && !strings.HasPrefix(res.Id.Name, q.namePrefix) { + return false + } + + return true +} diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go new file mode 100644 index 00000000000..a72264cbcf1 --- /dev/null +++ b/internal/storage/inmem/store.go @@ -0,0 +1,328 @@ +package inmem + +import ( + "context" + "time" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// Store implements an in-memory resource database using go-memdb. +// +// It can be used as a storage backend directly via the Backend type in this +// package, but also handles reads in our Raft backend, and can be used as a +// local cache when storing data in external systems (e.g. RDBMS, K/V stores). +type Store struct { + db *memdb.MemDB + pub *stream.EventPublisher +} + +// NewStore creates a Store. +// +// You must call Run before using the store. +func NewStore() (*Store, error) { + db, err := memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + tableNameMetadata: { + Name: tableNameMetadata, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Key"}, + }, + }, + }, + tableNameResources: { + Name: tableNameResources, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: idIndexer{}, + }, + indexNameOwner: { + Name: indexNameOwner, + AllowMissing: true, + Unique: false, + Indexer: ownerIndexer{}, + }, + }, + }, + }, + }) + if err != nil { + return nil, err + } + + s := &Store{ + db: db, + pub: stream.NewEventPublisher(10 * time.Second), + } + s.pub.RegisterHandler(eventTopic, s.watchSnapshot, false) + + return s, nil +} + +// Run until the given context is canceled. This method blocks, so should be +// called in a goroutine. +func (s *Store) Run(ctx context.Context) { s.pub.Run(ctx) } + +// Read a resource using its ID. +// +// For more information, see the storage.Backend documentation. +func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + val, err := tx.First(tableNameResources, indexNameID, id) + if err != nil { + return nil, err + } + if val == nil { + return nil, storage.ErrNotFound + } + + res := val.(*pbresource.Resource) + + // Observe the Uid if it was given. + if id.Uid != "" && res.Id.Uid != id.Uid { + return nil, storage.ErrNotFound + } + + // Let the caller know they need to upgrade/downgrade the schema version. + if id.Type.GroupVersion != res.Id.Type.GroupVersion { + return nil, storage.GroupVersionMismatchError{ + RequestedType: id.Type, + Stored: res, + } + } + + return res, nil +} + +// WriteCAS performs an atomic Check-And-Set (CAS) write of a resource. +// +// For more information, see the storage.Backend documentation. +func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + existing, err := tx.First(tableNameResources, indexNameID, res.Id) + if err != nil { + return err + } + + if existing == nil && vsn != "" { + return storage.ErrConflict + } + + if existing != nil { + existingRes := existing.(*pbresource.Resource) + + // Ensure CAS semantics. + if existingRes.Version != vsn { + return storage.ErrConflict + } + + // Uid is immutable. + if existingRes.Id.Uid != res.Id.Uid { + return storage.ErrConflict + } + } + + if err := tx.Insert(tableNameResources, res); err != nil { + return err + } + + idx, err := incrementEventIndex(tx) + if err != nil { + return nil + } + tx.Commit() + + s.publishEvent(idx, &pbresource.WatchEvent{ + Operation: pbresource.WatchEvent_OPERATION_UPSERT, + Resource: res, + }) + + return nil +} + +// DeleteCAS performs an atomic Check-And-Set (CAS) deletion of a resource. +// +// For more information, see the storage.Backend documentation. +func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + existing, err := tx.First(tableNameResources, indexNameID, id) + if err != nil { + return err + } + + // Deleting an already deleted resource is a no-op. + if existing == nil { + return nil + } + + res := existing.(*pbresource.Resource) + + // Deleting a resource using a previous Uid is a no-op. + if id.Uid != res.Id.Uid { + return nil + } + + // Ensure CAS semantics. + if vsn != res.Version { + return storage.ErrConflict + } + + if err := tx.Delete(tableNameResources, id); err != nil { + return err + } + + idx, err := incrementEventIndex(tx) + if err != nil { + return nil + } + tx.Commit() + + s.publishEvent(idx, &pbresource.WatchEvent{ + Operation: pbresource.WatchEvent_OPERATION_DELETE, + Resource: res, + }) + + return nil +} + +// List resources of the given type, tenancy, and optionally matching the given +// name prefix. +// +// For more information, see the storage.Backend documentation. +func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + return listTxn(tx, query{typ, ten, namePrefix}) +} + +// WatchList watches resources of the given type, tenancy, and optionally +// matching the given name prefix. +// +// For more information, see the storage.Backend documentation. +func (s *Store) WatchList(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) (*Watch, error) { + // If the user specifies a wildcard, we subscribe to events for resources in + // all partitions, peers, and namespaces, and manually filter out irrelevant + // stuff (in Watch.Next). + // + // If the user gave exact tenancy values, we can subscribe to events for the + // relevant resources only, which is far more efficient. + var sub stream.Subject + if ten.Partition == storage.Wildcard || + ten.PeerName == storage.Wildcard || + ten.Namespace == storage.Wildcard { + sub = wildcardSubject{typ} + } else { + sub = tenancySubject{typ, ten} + } + + ss, err := s.pub.Subscribe(&stream.SubscribeRequest{ + Topic: eventTopic, + Subject: sub, + }) + if err != nil { + return nil, err + } + + return &Watch{ + sub: ss, + query: query{ + resourceType: typ, + tenancy: ten, + namePrefix: namePrefix, + }, + }, nil +} + +// OwnerReferences returns the IDs of resources owned by the resource with the +// given ID. +// +// For more information, see the storage.Backend documentation. +func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + iter, err := tx.Get(tableNameResources, indexNameOwner, id) + if err != nil { + return nil, err + } + + var refs []*pbresource.ID + for v := iter.Next(); v != nil; v = iter.Next() { + refs = append(refs, v.(*pbresource.Resource).Id) + } + return refs, nil +} + +const ( + tableNameMetadata = "metadata" + tableNameResources = "resources" + + indexNameID = "id" + indexNameOwner = "owner" + + metaKeyEventIndex = "index" +) + +func listTxn(tx *memdb.Txn, q query) ([]*pbresource.Resource, error) { + iter, err := tx.Get(tableNameResources, indexNameID+"_prefix", q) + if err != nil { + return nil, err + } + + list := make([]*pbresource.Resource, 0) + for v := iter.Next(); v != nil; v = iter.Next() { + res := v.(*pbresource.Resource) + + if q.matches(res) { + list = append(list, res) + } + } + return list, nil +} + +type meta struct { + Key string + Value any +} + +func incrementEventIndex(tx *memdb.Txn) (uint64, error) { + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + idx++ + if err := tx.Insert(tableNameMetadata, meta{Key: metaKeyEventIndex, Value: idx}); err != nil { + return 0, nil + } + return idx, nil +} + +func currentEventIndex(tx *memdb.Txn) (uint64, error) { + v, err := tx.First(tableNameMetadata, indexNameID, metaKeyEventIndex) + if err != nil { + return 0, err + } + if v == nil { + return 0, nil + } + return v.(meta).Value.(uint64), nil +} diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go new file mode 100644 index 00000000000..923b460ee68 --- /dev/null +++ b/internal/storage/inmem/watch.go @@ -0,0 +1,188 @@ +package inmem + +import ( + "context" + "fmt" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/pbsubscribe" +) + +// Watch implements the storage.Watch interface using a stream.Subscription. +type Watch struct { + sub *stream.Subscription + query query + + // events holds excess events when they are bundled in a stream.PayloadEvents, + // until Next is called again. + events []stream.Event +} + +// Next returns the next WatchEvent, blocking until one is available. +func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { + for { + e, err := w.nextEvent(ctx) + if err != nil { + return nil, err + } + + event := e.Payload.(eventPayload).event + if w.query.matches(event.Resource) { + return event, nil + } + } +} + +func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { + if len(w.events) != 0 { + event := w.events[0] + w.events = w.events[1:] + return &event, nil + } + + for { + e, err := w.sub.Next(ctx) + if err != nil { + return nil, err + } + + if e.IsFramingEvent() { + continue + } + + switch t := e.Payload.(type) { + case eventPayload: + return &e, nil + case *stream.PayloadEvents: + if len(t.Items) == 0 { + continue + } + + event, rest := t.Items[0], t.Items[1:] + w.events = rest + return &event, nil + } + } +} + +var eventTopic = stream.StringTopic("resources") + +type eventPayload struct { + subject stream.Subject + event *pbresource.WatchEvent +} + +func (p eventPayload) Subject() stream.Subject { return p.subject } + +// These methods are required by the stream.Payload interface, but we don't use them. +func (eventPayload) HasReadPermission(acl.Authorizer) bool { return false } +func (eventPayload) ToSubscriptionEvent(uint64) *pbsubscribe.Event { return nil } + +type wildcardSubject struct { + resourceType storage.UnversionedType +} + +func (s wildcardSubject) String() string { + return s.resourceType.Group + indexSeparator + + s.resourceType.Kind + indexSeparator + + storage.Wildcard +} + +type tenancySubject struct { + resourceType storage.UnversionedType + tenancy *pbresource.Tenancy +} + +func (s tenancySubject) String() string { + return s.resourceType.Group + indexSeparator + + s.resourceType.Kind + indexSeparator + + s.tenancy.Partition + indexSeparator + + s.tenancy.PeerName + indexSeparator + + s.tenancy.Namespace +} + +// publishEvent sends the event to the relevant Watches. +func (s *Store) publishEvent(idx uint64, event *pbresource.WatchEvent) { + id := event.Resource.Id + resourceType := storage.UnversionedTypeFrom(id.Type) + + // We publish two copies of the event: one to the tenancy-specific subject and + // another to a wildcard subject. Ideally, we'd be able to put the type in the + // topic instead and use stream.SubjectWildcard, but this requires knowing all + // types up-front (to register the snapshot handlers). + s.pub.Publish([]stream.Event{ + { + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: wildcardSubject{resourceType}, + event: event, + }, + }, + { + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: tenancySubject{ + resourceType: resourceType, + tenancy: id.Tenancy, + }, + event: event, + }, + }, + }) +} + +// watchSnapshot implements a stream.SnapshotFunc to provide upsert events for +// the initial state of the world. +func (s *Store) watchSnapshot(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + var q query + switch t := req.Subject.(type) { + case tenancySubject: + q.resourceType = t.resourceType + q.tenancy = t.tenancy + case wildcardSubject: + q.resourceType = t.resourceType + q.tenancy = &pbresource.Tenancy{ + Partition: storage.Wildcard, + PeerName: storage.Wildcard, + Namespace: storage.Wildcard, + } + default: + return 0, fmt.Errorf("unhandled subject type: %T", req.Subject) + } + + tx := s.db.Txn(false) + defer tx.Abort() + + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + results, err := listTxn(tx, q) + if err != nil { + return 0, nil + } + + events := make([]stream.Event, len(results)) + for i, r := range results { + events[i] = stream.Event{ + Topic: eventTopic, + Index: idx, + Payload: eventPayload{ + subject: req.Subject, + event: &pbresource.WatchEvent{ + Operation: pbresource.WatchEvent_OPERATION_UPSERT, + Resource: r, + }, + }, + } + } + snap.Append(events) + + return idx, nil +} From 74eef9a493f72141f8a1e7f6d3a7815758832b0e Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 6 Mar 2023 15:26:46 +0000 Subject: [PATCH 04/24] storage: better error messages --- internal/storage/storage.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 7907bb1a7e1..1957890ad3d 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -18,12 +18,12 @@ var ( // ErrConflict indicates that the attempted write failed because of a version // or UID mismatch. - ErrConflict = errors.New("CAS operation failed with conflict") + ErrConflict = errors.New("operation failed because of a Version or Uid mismatch") // ErrInconsistent indicates that the attempted write or consistent read could // not be achieved because of a consistency or availability issue (e.g. loss of // quorum, or when interacting with a Raft follower). - ErrInconsistent = errors.New("cannot satisfy required consistency") + ErrInconsistent = errors.New("cannot satisfy consistency requirements") ) // Backend provides the low-level storage substrate for resources. It can be From 2731ba87a5beddd1a421c40878177f05a766517e Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 13 Mar 2023 12:14:34 +0000 Subject: [PATCH 05/24] storage: owner references should be anchored to a specific uid --- internal/storage/conformance/conformance.go | 9 +++++ internal/storage/inmem/query.go | 43 ++++++++++++++------- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index e1c23150c6c..786837dce5c 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -524,6 +524,15 @@ func testOwnerReferences(t *testing.T, opts TestOptions) { require.NoError(t, err) require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + t.Run("references are anchored to a specific uid", func(t *testing.T) { + id := clone(owner.Id) + id.Uid = "different" + + refs, err := backend.OwnerReferences(ctx, id) + require.NoError(t, err) + require.Empty(t, refs) + }) + t.Run("deleting the owner doesn't remove the references", func(t *testing.T) { require.NoError(t, backend.DeleteCAS(ctx, owner.Id, owner.Version)) diff --git a/internal/storage/inmem/query.go b/internal/storage/inmem/query.go index ea5a23a830c..0396947fc1c 100644 --- a/internal/storage/inmem/query.go +++ b/internal/storage/inmem/query.go @@ -12,11 +12,13 @@ import ( // indexSeparator delimits the segments of our radix tree keys. const indexSeparator = "\x00" -// readIndexer implements the memdb.Indexer interface (see FromArgs). -type readIndexer struct{} +// idIndexer implements the memdb.Indexer, memdb.SingleIndexer and +// memdb.PrefixIndexer interfaces. It is used for indexing resources +// by their IDs. +type idIndexer struct{} // FromArgs constructs a radix tree key from an ID for lookup. -func (i readIndexer) FromArgs(args ...any) ([]byte, error) { +func (i idIndexer) FromArgs(args ...any) ([]byte, error) { if l := len(args); l != 1 { return nil, fmt.Errorf("expected 1 arg, got: %d", l) } @@ -24,21 +26,17 @@ func (i readIndexer) FromArgs(args ...any) ([]byte, error) { if !ok { return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) } - return indexFromID(id), nil + return indexFromID(id, false), nil } -// idIndexer implements the memdb.SingleIndexer and memdb.PrefixIndexer -// interfaces. It is used for indexing resources by their IDs. -type idIndexer struct{ readIndexer } - // FromObject constructs a radix tree key from a Resource at write-time, or an // ID at delete-time. func (i idIndexer) FromObject(raw any) (bool, []byte, error) { switch t := raw.(type) { case *pbresource.ID: - return true, indexFromID(t), nil + return true, indexFromID(t, false), nil case *pbresource.Resource: - return true, indexFromID(t.Id), nil + return true, indexFromID(t.Id, false), nil } return false, nil, fmt.Errorf("expected *pbresource.Resource or *pbresource.ID, got: %T", raw) } @@ -56,9 +54,21 @@ func (i idIndexer) PrefixFromArgs(args ...any) ([]byte, error) { return q.indexPrefix(), nil } -// ownerIndexer implements the memdb.SingleIndexer interface. It is used for -// indexing resources by their owners. -type ownerIndexer struct{ readIndexer } +// ownerIndexer implements the memdb.Indexer and memdb.SingleIndexer interfaces. +// It is used for indexing resources by their owners. +type ownerIndexer struct{} + +// FromArgs constructs a radix tree key from an ID for lookup. +func (i ownerIndexer) FromArgs(args ...any) ([]byte, error) { + if l := len(args); l != 1 { + return nil, fmt.Errorf("expected 1 arg, got: %d", l) + } + id, ok := args[0].(*pbresource.ID) + if !ok { + return nil, fmt.Errorf("expected *pbresource.ID, got: %T", args[0]) + } + return indexFromID(id, true), nil +} // FromObject constructs a radix key tree from a Resource at write-time. func (i ownerIndexer) FromObject(raw any) (bool, []byte, error) { @@ -69,7 +79,7 @@ func (i ownerIndexer) FromObject(raw any) (bool, []byte, error) { if res.Owner == nil { return false, nil, nil } - return true, indexFromID(res.Owner), nil + return true, indexFromID(res.Owner, true), nil } func indexFromType(t storage.UnversionedType) []byte { @@ -87,11 +97,14 @@ func indexFromTenancy(t *pbresource.Tenancy) []byte { return b.Bytes() } -func indexFromID(id *pbresource.ID) []byte { +func indexFromID(id *pbresource.ID, includeUid bool) []byte { var b indexBuilder b.Raw(indexFromType(storage.UnversionedTypeFrom(id.Type))) b.Raw(indexFromTenancy(id.Tenancy)) b.String(id.Name) + if includeUid { + b.String(id.Uid) + } return b.Bytes() } From c25a7ec6825f5b559101682f6dbb34379886c98f Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 13 Mar 2023 12:17:14 +0000 Subject: [PATCH 06/24] Remove unused test function --- internal/storage/conformance/conformance.go | 28 --------------------- 1 file changed, 28 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index 786837dce5c..bca4b54462d 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -450,34 +450,6 @@ func testListWatch(t *testing.T, opts TestOptions) { }) } -func testWatch(t *testing.T, opts TestOptions) { - backend := opts.NewBackend(t) - ctx := testContext(t) - - r1 := &pbresource.Resource{ - Id: &pbresource.ID{ - Type: typeAv1, - Tenancy: tenancyDefault, - Name: "web", - Uid: "a", - }, - Version: "1", - } - _, err := backend.WriteCAS(ctx, r1, "") - require.NoError(t, err) - - watch, err := backend.WatchList(ctx, storage.UnversionedTypeFrom(typeAv1), tenancyDefault, "") - require.NoError(t, err) - - timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) - t.Cleanup(cancel) - event, err := watch.Next(timeoutCtx) - require.NoError(t, err) - - require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT.String(), event.Operation.String()) - require.Equal(t, r1, event.Resource) -} - func testOwnerReferences(t *testing.T, opts TestOptions) { backend := opts.NewBackend(t) ctx := testContext(t) From d5733f99fed06d0089e2d25937936858d54cb1af Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Tue, 14 Mar 2023 12:32:34 +0000 Subject: [PATCH 07/24] storage: clarify List docs Co-authored-by: Matt Keeler --- internal/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 1957890ad3d..ceefba8ce14 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -143,7 +143,7 @@ type Backend interface { // // # Tenancy Wildcard // - // In order to list resources across multiple tenancy units (e.g. partitions) + // In order to list resources across multiple tenancy units (e.g. namespaces) // pass the Wildcard sentinel value in tenancy fields. // // # GroupVersion From 56c4c3a0abfe6409af0fc54eabbe1cd0ab3b6dc8 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Tue, 14 Mar 2023 18:04:44 +0000 Subject: [PATCH 08/24] storage: fix potential out-of-order events --- internal/storage/conformance/conformance.go | 10 ++++++++++ internal/storage/inmem/store.go | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index bca4b54462d..485a7fd59ca 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" ) type TestOptions struct { @@ -431,6 +432,11 @@ func testListWatch(t *testing.T, opts TestOptions) { require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) require.Containsf(t, tc.results, event.Resource, "resource not in expected results: %s", event.Resource.Id) + + // Check that Read is sequentially consistent with Watch. + readRes, err := backend.Read(ctx, event.Resource.Id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, event.Resource, readRes) } // Delete a random resource to check we get an event. @@ -445,6 +451,10 @@ func testListWatch(t *testing.T, opts TestOptions) { require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, event.Operation) require.Equal(t, del, event.Resource) + + // Check that Read is sequentially consistent with Watch. + _, err = backend.Read(ctx, del.Id) + require.ErrorIs(t, err, storage.ErrNotFound) }) } }) diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index a72264cbcf1..4d3c98e5515 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -2,6 +2,7 @@ package inmem import ( "context" + "sync" "time" "github.com/hashicorp/go-memdb" @@ -19,6 +20,20 @@ import ( type Store struct { db *memdb.MemDB pub *stream.EventPublisher + + // eventLock is used to serialize operations that result in the publishing of + // events (i.e. writes and deletes) to ensure correct ordering when there are + // concurrent writers. + // + // We cannot rely on MemDB's write lock for this, because events must be + // published *after* the transaction is committed to provide sequential + // consistency between Watch and Read calls. In other words, if we were to + // publish an event before the transaction was committed, there would be a + // small window of time where a watcher (e.g. controller) could try to Read + // the resource and not get the version they were notified about. + // + // Without this lock, it would be possible to publish events out-of-order. + eventLock sync.Mutex } // NewStore creates a Store. @@ -111,6 +126,9 @@ func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) { // // For more information, see the storage.Backend documentation. func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { + s.eventLock.Lock() + defer s.eventLock.Unlock() + tx := s.db.Txn(true) defer tx.Abort() @@ -159,6 +177,9 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { // // For more information, see the storage.Backend documentation. func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { + s.eventLock.Lock() + defer s.eventLock.Unlock() + tx := s.db.Txn(true) defer tx.Abort() From 033b612bc8f4c2b63defedb5aa23ed08c083d124 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 15 Mar 2023 11:36:01 +0000 Subject: [PATCH 09/24] storage: clarify WatchList consistency model --- internal/storage/storage.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index ceefba8ce14..e4e23387ffd 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -164,8 +164,21 @@ type Backend interface { // immediately, and will be followed by delta events whenever resources are // written or deleted. // - // See List docs for details about Tenancy Wildcard, GroupVersion, and - // Consistency. + // # Consistency + // + // WatchList makes no guarantees about event timeliness (e.g. an event for a + // write may not be received immediately), but it does guarantee that events + // will be emitted in the correct order. + // + // There's also a sequential consistency guarantee between Read and WatchList, + // such that Read will never return data that is older than the most recent + // event you received. Note: this guarantee holds at the (in-process) storage + // backend level, only. Controllers and other users of the Resource Service API + // must remain connected to the same Consul server process to avoid receiving + // events about writes that they then cannot read. In other words, it is *not* + // linearizable: https://jepsen.io/consistency/models/sequential + // + // See List docs for details about Tenancy Wildcard and GroupVersion. WatchList(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (Watch, error) // OwnerReferences returns the IDs of resources owned by the resource with the From f54bcb8024ab0b3176850287120799df004cbaf3 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 15 Mar 2023 11:42:08 +0000 Subject: [PATCH 10/24] storage: s/Check-And-Set/Compare-And-Swap/ --- internal/storage/inmem/store.go | 4 ++-- internal/storage/storage.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index 4d3c98e5515..40e8dada92b 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -122,7 +122,7 @@ func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) { return res, nil } -// WriteCAS performs an atomic Check-And-Set (CAS) write of a resource. +// WriteCAS performs an atomic Compare-And-Swap (CAS) write of a resource. // // For more information, see the storage.Backend documentation. func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { @@ -173,7 +173,7 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { return nil } -// DeleteCAS performs an atomic Check-And-Set (CAS) deletion of a resource. +// DeleteCAS performs an atomic Compare-And-Swap (CAS) deletion of a resource. // // For more information, see the storage.Backend documentation. func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index e4e23387ffd..37967bc5014 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -86,7 +86,7 @@ type Backend interface { // Use ReadConsistent sparingly, and prefer Read when possible. ReadConsistent(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) - // WriteCAS performs an atomic CAS (Check-And-Set) write of a resource based + // WriteCAS performs an atomic CAS (Compare-And-Swap) write of a resource based // on its version. The given version will be compared to what is stored, and // if it does not match, ErrConflict will be returned. To create new resources, // pass an empty version string. @@ -110,7 +110,7 @@ type Backend interface { // See Backend docs for more details. WriteCAS(ctx context.Context, res *pbresource.Resource, version string) (*pbresource.Resource, error) - // DeleteCAS performs an atomic CAS (Check-And-Set) deletion of a resource + // DeleteCAS performs an atomic CAS (Compare-And-Swap) deletion of a resource // based on its version. The given version will be compared to what is stored, // and if it does not match, ErrConflict will be returned. // From 0167a5bc1eedd1d7ba6badae9a12ec366e7bb7dc Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 15 Mar 2023 11:45:22 +0000 Subject: [PATCH 11/24] storage: move event construction into publishEvent --- internal/storage/inmem/store.go | 10 ++-------- internal/storage/inmem/watch.go | 5 +++-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index 40e8dada92b..7501d157f76 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -165,10 +165,7 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { } tx.Commit() - s.publishEvent(idx, &pbresource.WatchEvent{ - Operation: pbresource.WatchEvent_OPERATION_UPSERT, - Resource: res, - }) + s.publishEvent(idx, pbresource.WatchEvent_OPERATION_UPSERT, res) return nil } @@ -215,10 +212,7 @@ func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { } tx.Commit() - s.publishEvent(idx, &pbresource.WatchEvent{ - Operation: pbresource.WatchEvent_OPERATION_DELETE, - Resource: res, - }) + s.publishEvent(idx, pbresource.WatchEvent_OPERATION_DELETE, res) return nil } diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go index 923b460ee68..5356ea7a348 100644 --- a/internal/storage/inmem/watch.go +++ b/internal/storage/inmem/watch.go @@ -105,9 +105,10 @@ func (s tenancySubject) String() string { } // publishEvent sends the event to the relevant Watches. -func (s *Store) publishEvent(idx uint64, event *pbresource.WatchEvent) { - id := event.Resource.Id +func (s *Store) publishEvent(idx uint64, op pbresource.WatchEvent_Operation, res *pbresource.Resource) { + id := res.Id resourceType := storage.UnversionedTypeFrom(id.Type) + event := &pbresource.WatchEvent{Operation: op, Resource: res} // We publish two copies of the event: one to the tenancy-specific subject and // another to a wildcard subject. Ideally, we'd be able to put the type in the From 048712d39e062ea4f742965ed7655e7e626248ea Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 15 Mar 2023 12:11:33 +0000 Subject: [PATCH 12/24] storage: document Read-Modify-Write patterns --- internal/storage/storage.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 37967bc5014..eacd7f33180 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -47,6 +47,22 @@ var ( // In order to support automatic translation between schema versions, we only // store a single version of a resource, and treat types with the same Group // and Kind, but different GroupVersions, as equivalent. +// +// # Read-Modify-Write Patterns +// +// All writes at the storage backend level are CAS (Compare-And-Swap) operations +// where the caller must provide the resource in its entirety, along with the +// current version string. +// +// Non-CAS writes should be implemented at a higher level (i.e. in the Resource +// Service) by reading the resource, applying the user's requested modifications, +// and writing it back. This allows us to ensure we're correctly carrying over +// the resource's Status and Uid, without requiring support for partial update +// or "patch" operations from external storage systems. +// +// In cases where there are concurrent interleaving writes made to a resource, +// it's likely that a CAS operation will fail, so callers may need to put their +// Read-Modify-Write cycle in a retry loop. type Backend interface { // Read a resource using its ID. // From fbbbec9d38d2b79b88449f8b69b7dba5bc0758a4 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 15 Mar 2023 16:20:23 +0000 Subject: [PATCH 13/24] storage: add clarifying comment about resource creation --- internal/storage/inmem/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index 7501d157f76..15745b2c1ad 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -137,6 +137,7 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { return err } + // Callers provide an empty version string on initial resource creation. if existing == nil && vsn != "" { return storage.ErrConflict } From e17b50b807eeae3fd7b005466d72a01d71a0f720 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Thu, 16 Mar 2023 10:17:59 +0000 Subject: [PATCH 14/24] storage: clarify OwnerReferences consistency --- internal/storage/storage.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index eacd7f33180..7c8914d2838 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -194,6 +194,9 @@ type Backend interface { // events about writes that they then cannot read. In other words, it is *not* // linearizable: https://jepsen.io/consistency/models/sequential // + // There's a similar guarantee between WatchList and OwnerReferences, see the + // OwnerReferences docs for more information. + // // See List docs for details about Tenancy Wildcard and GroupVersion. WatchList(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (Watch, error) @@ -202,8 +205,17 @@ type Backend interface { // // # Consistency // - // OwnerReferences makes no guarantees about consistency, and may return stale - // results. + // OwnerReferences may return stale results, but is sequentially consistent + // with events received from WatchList. In practice, this means that if you + // learn that a resource has been deleted through a watch event, the results + // you receive from OwnerReferences will contain all references that existed + // at the time the owner was deleted. It doesn't make any guarantees about + // references that are created *after* the owner was deleted, though, so you + // must either prevent that from happening (e.g. by performing a consistent + // read of the owner in the write-path, which has its own ordering/correctness + // challenges), or by calling OwnerReferences after the expected window of + // inconsistency (e.g. deferring cascading deletion, or doing a second pass + // an hour later). OwnerReferences(ctx context.Context, id *pbresource.ID) ([]*pbresource.ID, error) } From f441187cacf573a7fa8988358f68099769bfb87e Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 17 Mar 2023 12:55:40 +0000 Subject: [PATCH 15/24] storage: separate ErrConflict into two errors --- internal/storage/conformance/conformance.go | 12 ++++++------ internal/storage/inmem/store.go | 8 ++++---- internal/storage/storage.go | 19 ++++++++++++------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index 485a7fd59ca..ca0e2242f2e 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -118,7 +118,7 @@ func testCASWrite(t *testing.T, opts TestOptions) { } _, err := backend.WriteCAS(ctx, v1, "some-version") - require.ErrorIs(t, err, storage.ErrConflict) + require.ErrorIs(t, err, storage.ErrCASFailure) _, err = backend.WriteCAS(ctx, v1, "") require.NoError(t, err) @@ -133,10 +133,10 @@ func testCASWrite(t *testing.T, opts TestOptions) { v3.Version = "3" _, err = backend.WriteCAS(ctx, v3, "") - require.ErrorIs(t, err, storage.ErrConflict) + require.ErrorIs(t, err, storage.ErrCASFailure) _, err = backend.WriteCAS(ctx, v3, v1.Version) - require.ErrorIs(t, err, storage.ErrConflict) + require.ErrorIs(t, err, storage.ErrCASFailure) }) t.Run("uid immutability", func(t *testing.T) { @@ -165,7 +165,7 @@ func testCASWrite(t *testing.T, opts TestOptions) { v2.Id.Uid = "b" _, err = backend.WriteCAS(ctx, v2, v1.Version) - require.ErrorIs(t, err, storage.ErrConflict) + require.ErrorIs(t, err, storage.ErrWrongUid) v2.Id.Uid = v1.Id.Uid _, err = backend.WriteCAS(ctx, v2, v1.Version) @@ -200,8 +200,8 @@ func testCASDelete(t *testing.T, opts TestOptions) { _, err := backend.WriteCAS(ctx, res, "") require.NoError(t, err) - require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, ""), storage.ErrConflict) - require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "2"), storage.ErrConflict) + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, ""), storage.ErrCASFailure) + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "2"), storage.ErrCASFailure) require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index 15745b2c1ad..779f36f22cb 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -139,7 +139,7 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { // Callers provide an empty version string on initial resource creation. if existing == nil && vsn != "" { - return storage.ErrConflict + return storage.ErrCASFailure } if existing != nil { @@ -147,12 +147,12 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error { // Ensure CAS semantics. if existingRes.Version != vsn { - return storage.ErrConflict + return storage.ErrCASFailure } // Uid is immutable. if existingRes.Id.Uid != res.Id.Uid { - return storage.ErrConflict + return storage.ErrWrongUid } } @@ -200,7 +200,7 @@ func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error { // Ensure CAS semantics. if vsn != res.Version { - return storage.ErrConflict + return storage.ErrCASFailure } if err := tx.Delete(tableNameResources, id); err != nil { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 7c8914d2838..e2b698e1bc9 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -16,9 +16,14 @@ var ( // ErrNotFound indicates that the resource could not be found. ErrNotFound = errors.New("resource not found") - // ErrConflict indicates that the attempted write failed because of a version - // or UID mismatch. - ErrConflict = errors.New("operation failed because of a Version or Uid mismatch") + // ErrCASFailure indicates that the attempted write failed because the given + // version does not match what is currently stored. + ErrCASFailure = errors.New("CAS operation failed because the given version doesn't match what is stored") + + // ErrWrongUid indicates that the attempted write failed because the resource's + // Uid doesn't match what is currently stored (e.g. the caller is trying to + // operate on a deleted resource with the same name). + ErrWrongUid = errors.New("write failed because the given uid doesn't match what is stored") // ErrInconsistent indicates that the attempted write or consistent read could // not be achieved because of a consistency or availability issue (e.g. loss of @@ -103,8 +108,8 @@ type Backend interface { ReadConsistent(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) // WriteCAS performs an atomic CAS (Compare-And-Swap) write of a resource based - // on its version. The given version will be compared to what is stored, and - // if it does not match, ErrConflict will be returned. To create new resources, + // on its version. The given version will be compared to what is stored, and if + // it does not match, ErrCASFailure will be returned. To create new resources, // pass an empty version string. // // If a write cannot be performed because of a consistency or availability @@ -114,7 +119,7 @@ type Backend interface { // # UIDs // // UIDs are immutable, so if the given resource's Uid field doesn't match what - // is stored, ErrConflict will be returned. + // is stored, ErrWrongUid will be returned. // // See Backend docs for more details. // @@ -128,7 +133,7 @@ type Backend interface { // DeleteCAS performs an atomic CAS (Compare-And-Swap) deletion of a resource // based on its version. The given version will be compared to what is stored, - // and if it does not match, ErrConflict will be returned. + // and if it does not match, ErrCASFailure will be returned. // // If the resource does not exist (i.e. has already been deleted) no error will // be returned. From 84ff540864c7629b48be4886a886a6653c0ef1ff Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 17 Mar 2023 13:48:17 +0000 Subject: [PATCH 16/24] storage: make backends responsible for managing the version --- internal/storage/conformance/conformance.go | 113 +++++++++----------- internal/storage/inmem/backend.go | 20 +++- internal/storage/storage.go | 8 +- proto/private/prototest/testing.go | 14 +++ 4 files changed, 84 insertions(+), 71 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index ca0e2242f2e..ce3f5159207 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" @@ -49,15 +50,14 @@ func testRead(t *testing.T, opts TestOptions) { Name: "web", Uid: "a", }, - Version: "1", } - _, err := backend.WriteCAS(ctx, res, "") + _, err := backend.WriteCAS(ctx, res) require.NoError(t, err) t.Run("simple", func(t *testing.T) { output, err := backend.Read(ctx, res.Id) require.NoError(t, err) - require.Equal(t, res, output) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) }) t.Run("no uid", func(t *testing.T) { @@ -66,7 +66,7 @@ func testRead(t *testing.T, opts TestOptions) { output, err := backend.Read(ctx, id) require.NoError(t, err) - require.Equal(t, res, output) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) }) t.Run("different id", func(t *testing.T) { @@ -95,7 +95,7 @@ func testRead(t *testing.T, opts TestOptions) { var e storage.GroupVersionMismatchError if errors.As(err, &e) { require.Equal(t, id.Type, e.RequestedType) - require.Equal(t, res, e.Stored) + prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion) } else { t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err) } @@ -114,28 +114,29 @@ func testCASWrite(t *testing.T, opts TestOptions) { Name: "web", Uid: "a", }, - Version: "1", } - _, err := backend.WriteCAS(ctx, v1, "some-version") + v1.Version = "some-version" + _, err := backend.WriteCAS(ctx, v1) require.ErrorIs(t, err, storage.ErrCASFailure) - _, err = backend.WriteCAS(ctx, v1, "") + v1.Version = "" + v1, err = backend.WriteCAS(ctx, v1) require.NoError(t, err) + require.NotEmpty(t, v1.Version) - v2 := clone(v1) - v2.Version = "2" - - _, err = backend.WriteCAS(ctx, v2, v1.Version) + v2, err := backend.WriteCAS(ctx, v1) require.NoError(t, err) + require.NotEmpty(t, v2.Version) + require.NotEqual(t, v1.Version, v2.Version) v3 := clone(v2) - v3.Version = "3" - - _, err = backend.WriteCAS(ctx, v3, "") + v3.Version = "" + _, err = backend.WriteCAS(ctx, v3) require.ErrorIs(t, err, storage.ErrCASFailure) - _, err = backend.WriteCAS(ctx, v3, v1.Version) + v3.Version = v1.Version + _, err = backend.WriteCAS(ctx, v3) require.ErrorIs(t, err, storage.ErrCASFailure) }) @@ -143,42 +144,38 @@ func testCASWrite(t *testing.T, opts TestOptions) { backend := opts.NewBackend(t) ctx := testContext(t) - v1 := &pbresource.Resource{ + v1, err := backend.WriteCAS(ctx, &pbresource.Resource{ Id: &pbresource.ID{ Type: typeB, Tenancy: tenancyDefault, Name: "web", Uid: "a", }, - Version: "1", - } - _, err := backend.WriteCAS(ctx, v1, "") + }) require.NoError(t, err) // Uid cannot change. v2 := clone(v1) - v2.Version = "2" - v2.Id.Uid = "" - _, err = backend.WriteCAS(ctx, v2, v1.Version) + _, err = backend.WriteCAS(ctx, v2) require.Error(t, err) v2.Id.Uid = "b" - _, err = backend.WriteCAS(ctx, v2, v1.Version) + _, err = backend.WriteCAS(ctx, v2) require.ErrorIs(t, err, storage.ErrWrongUid) v2.Id.Uid = v1.Id.Uid - _, err = backend.WriteCAS(ctx, v2, v1.Version) + v2, err = backend.WriteCAS(ctx, v2) require.NoError(t, err) // Uid can change after original resource is deleted. require.NoError(t, backend.DeleteCAS(ctx, v2.Id, v2.Version)) v3 := clone(v2) - v3.Version = "3" v3.Id.Uid = "b" + v3.Version = "" - _, err = backend.WriteCAS(ctx, v2, "") + _, err = backend.WriteCAS(ctx, v3) require.NoError(t, err) }) } @@ -188,20 +185,18 @@ func testCASDelete(t *testing.T, opts TestOptions) { backend := opts.NewBackend(t) ctx := testContext(t) - res := &pbresource.Resource{ + res, err := backend.WriteCAS(ctx, &pbresource.Resource{ Id: &pbresource.ID{ Type: typeB, Tenancy: tenancyDefault, Name: "web", Uid: "a", }, - Version: "1", - } - _, err := backend.WriteCAS(ctx, res, "") + }) require.NoError(t, err) require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, ""), storage.ErrCASFailure) - require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "2"), storage.ErrCASFailure) + require.ErrorIs(t, backend.DeleteCAS(ctx, res.Id, "some-version"), storage.ErrCASFailure) require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) @@ -213,16 +208,14 @@ func testCASDelete(t *testing.T, opts TestOptions) { backend := opts.NewBackend(t) ctx := testContext(t) - res := &pbresource.Resource{ + res, err := backend.WriteCAS(ctx, &pbresource.Resource{ Id: &pbresource.ID{ Type: typeB, Tenancy: tenancyDefault, Name: "web", Uid: "a", }, - Version: "1", - } - _, err := backend.WriteCAS(ctx, res, "") + }) require.NoError(t, err) id := clone(res.Id) @@ -370,7 +363,7 @@ func testListWatch(t *testing.T, opts TestOptions) { ctx := testContext(t) for _, r := range seedData { - _, err := backend.WriteCAS(ctx, r, "") + _, err := backend.WriteCAS(ctx, r) require.NoError(t, err) } @@ -378,7 +371,7 @@ func testListWatch(t *testing.T, opts TestOptions) { t.Run(desc, func(t *testing.T) { res, err := backend.List(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) require.NoError(t, err) - require.ElementsMatch(t, res, tc.results) + prototest.AssertElementsMatch(t, res, tc.results, ignoreVersion) }) } }) @@ -391,7 +384,7 @@ func testListWatch(t *testing.T, opts TestOptions) { // Write the seed data before the watch has been established. for _, r := range seedData { - _, err := backend.WriteCAS(ctx, r, "") + _, err := backend.WriteCAS(ctx, r) require.NoError(t, err) } @@ -406,7 +399,7 @@ func testListWatch(t *testing.T, opts TestOptions) { require.NoError(t, err) require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) - require.Containsf(t, tc.results, event.Resource, "resource not in expected results: %s", event.Resource.Id) + prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) } }) @@ -419,7 +412,7 @@ func testListWatch(t *testing.T, opts TestOptions) { // Write the seed data after the watch has been established. for _, r := range seedData { - _, err := backend.WriteCAS(ctx, r, "") + _, err := backend.WriteCAS(ctx, r) require.NoError(t, err) } @@ -431,7 +424,7 @@ func testListWatch(t *testing.T, opts TestOptions) { require.NoError(t, err) require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) - require.Containsf(t, tc.results, event.Resource, "resource not in expected results: %s", event.Resource.Id) + prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) // Check that Read is sequentially consistent with Watch. readRes, err := backend.Read(ctx, event.Resource.Id) @@ -440,7 +433,8 @@ func testListWatch(t *testing.T, opts TestOptions) { } // Delete a random resource to check we get an event. - del := tc.results[rand.Intn(len(tc.results))] + del, err := backend.Read(ctx, tc.results[rand.Intn(len(tc.results))].Id) + require.NoError(t, err) require.NoError(t, backend.DeleteCAS(ctx, del.Id, del.Version)) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -450,7 +444,7 @@ func testListWatch(t *testing.T, opts TestOptions) { require.NoError(t, err) require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, event.Operation) - require.Equal(t, del, event.Resource) + prototest.AssertDeepEqual(t, del, event.Resource) // Check that Read is sequentially consistent with Watch. _, err = backend.Read(ctx, del.Id) @@ -464,47 +458,41 @@ func testOwnerReferences(t *testing.T, opts TestOptions) { backend := opts.NewBackend(t) ctx := testContext(t) - owner := &pbresource.Resource{ + owner, err := backend.WriteCAS(ctx, &pbresource.Resource{ Id: &pbresource.ID{ Type: typeAv1, Tenancy: tenancyDefault, Name: "owner", Uid: "a", }, - Version: "1", - } - _, err := backend.WriteCAS(ctx, owner, "") + }) require.NoError(t, err) - r1 := &pbresource.Resource{ + r1, err := backend.WriteCAS(ctx, &pbresource.Resource{ Id: &pbresource.ID{ Type: typeB, Tenancy: tenancyDefault, Name: "r1", Uid: "a", }, - Owner: owner.Id, - Version: "1", - } - _, err = backend.WriteCAS(ctx, r1, "") + Owner: owner.Id, + }) require.NoError(t, err) - r2 := &pbresource.Resource{ + r2, err := backend.WriteCAS(ctx, &pbresource.Resource{ Id: &pbresource.ID{ Type: typeAv2, Tenancy: tenancyDefault, Name: "r2", Uid: "a", }, - Owner: owner.Id, - Version: "1", - } - _, err = backend.WriteCAS(ctx, r2, "") + Owner: owner.Id, + }) require.NoError(t, err) refs, err := backend.OwnerReferences(ctx, owner.Id) require.NoError(t, err) - require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) t.Run("references are anchored to a specific uid", func(t *testing.T) { id := clone(owner.Id) @@ -520,7 +508,7 @@ func testOwnerReferences(t *testing.T, opts TestOptions) { refs, err = backend.OwnerReferences(ctx, owner.Id) require.NoError(t, err) - require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) }) t.Run("deleting the owned resource removes its reference", func(t *testing.T) { @@ -528,7 +516,7 @@ func testOwnerReferences(t *testing.T, opts TestOptions) { refs, err = backend.OwnerReferences(ctx, owner.Id) require.NoError(t, err) - require.ElementsMatch(t, refs, []*pbresource.ID{r1.Id}) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id}) }) } @@ -579,6 +567,8 @@ var ( resource(typeAv1, tenancyDefaultOtherNamespace, "autoscaler"), // 5 resource(typeAv1, tenancyDefaultOtherPeer, "amplifier"), // 6 } + + ignoreVersion = protocmp.IgnoreFields(&pbresource.Resource{}, "version") ) func resource(typ *pbresource.Type, ten *pbresource.Tenancy, name string) *pbresource.Resource { @@ -589,7 +579,6 @@ func resource(typ *pbresource.Type, ten *pbresource.Tenancy, name string) *pbres Name: name, Uid: "a", }, - Version: "1", } } diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go index 8b5cb2c9716..6360db88b63 100644 --- a/internal/storage/inmem/backend.go +++ b/internal/storage/inmem/backend.go @@ -2,6 +2,10 @@ package inmem import ( "context" + "strconv" + "sync/atomic" + + "google.golang.org/protobuf/proto" "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" @@ -18,11 +22,14 @@ func NewBackend() (*Backend, error) { if err != nil { return nil, err } - return &Backend{store}, nil + return &Backend{store: store}, nil } // Backend is a purely in-memory storage backend implementation. -type Backend struct{ store *Store } +type Backend struct { + store *Store + vsn uint64 +} // Run until the given context is canceled. This method blocks, so should be // called in a goroutine. @@ -39,11 +46,14 @@ func (b *Backend) ReadConsistent(_ context.Context, id *pbresource.ID) (*pbresou } // WriteCAS implements the storage.Backend interface. -func (b *Backend) WriteCAS(_ context.Context, res *pbresource.Resource, version string) (*pbresource.Resource, error) { - if err := b.store.WriteCAS(res, version); err != nil { +func (b *Backend) WriteCAS(_ context.Context, res *pbresource.Resource) (*pbresource.Resource, error) { + stored := proto.Clone(res).(*pbresource.Resource) + stored.Version = strconv.Itoa(int(atomic.AddUint64(&b.vsn, 1))) + + if err := b.store.WriteCAS(stored, res.Version); err != nil { return nil, err } - return res, nil + return stored, nil } // DeleteCAS implements the storage.Backend interface. diff --git a/internal/storage/storage.go b/internal/storage/storage.go index e2b698e1bc9..17e4e6f4d79 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -56,8 +56,8 @@ var ( // # Read-Modify-Write Patterns // // All writes at the storage backend level are CAS (Compare-And-Swap) operations -// where the caller must provide the resource in its entirety, along with the -// current version string. +// where the caller must provide the resource in its entirety, with the current +// version string. // // Non-CAS writes should be implemented at a higher level (i.e. in the Resource // Service) by reading the resource, applying the user's requested modifications, @@ -110,7 +110,7 @@ type Backend interface { // WriteCAS performs an atomic CAS (Compare-And-Swap) write of a resource based // on its version. The given version will be compared to what is stored, and if // it does not match, ErrCASFailure will be returned. To create new resources, - // pass an empty version string. + // set version to an empty string. // // If a write cannot be performed because of a consistency or availability // issue (e.g. when interacting with a Raft follower, or when quorum is lost) @@ -129,7 +129,7 @@ type Backend interface { // resource stored in an older form with a newer, and vice versa. // // See Backend docs for more details. - WriteCAS(ctx context.Context, res *pbresource.Resource, version string) (*pbresource.Resource, error) + WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) // DeleteCAS performs an atomic CAS (Compare-And-Swap) deletion of a resource // based on its version. The given version will be compared to what is stored, diff --git a/proto/private/prototest/testing.go b/proto/private/prototest/testing.go index bf25fb0a10e..33dbf10bf11 100644 --- a/proto/private/prototest/testing.go +++ b/proto/private/prototest/testing.go @@ -73,3 +73,17 @@ func AssertElementsMatch[V any]( t.Fatalf("assertion failed: slices do not have matching elements\n--- expected\n+++ actual\n%v", diff) } } + +func AssertContainsElement[V any](t testing.TB, list []V, element V, opts ...cmp.Option) { + t.Helper() + + opts = append(opts, protocmp.Transform()) + + for _, e := range list { + if cmp.Equal(e, element, opts...) { + return + } + } + + t.Fatalf("assertion failed: list does not contain element\n--- list\n%#v\n--- element: %#v", list, element) +} From d7ac1c610c5009536129892f7d97844c3ca2661c Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 17 Mar 2023 15:03:28 +0000 Subject: [PATCH 17/24] storage: make consistency an argument to Read --- internal/storage/conformance/conformance.go | 24 +++++------ internal/storage/inmem/backend.go | 7 +-- internal/storage/storage.go | 48 +++++++++++++++------ 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index ce3f5159207..72cbd92feb9 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -31,7 +31,7 @@ type TestOptions struct { func Test(t *testing.T, opts TestOptions) { require.NotNil(t, opts.NewBackend, "NewBackend method is required") - t.Run("Read", func(t *testing.T) { testRead(t, opts) }) + t.Run("Read", func(t *testing.T) { testRead(t, opts, storage.EventualConsistency) }) t.Run("CAS Write", func(t *testing.T) { testCASWrite(t, opts) }) t.Run("CAS Delete", func(t *testing.T) { testCASDelete(t, opts) }) t.Run("OwnerReferences", func(t *testing.T) { testOwnerReferences(t, opts) }) @@ -39,7 +39,7 @@ func Test(t *testing.T, opts TestOptions) { testListWatch(t, opts) } -func testRead(t *testing.T, opts TestOptions) { +func testRead(t *testing.T, opts TestOptions, consistency storage.ReadConsistency) { ctx := testContext(t) backend := opts.NewBackend(t) @@ -55,7 +55,7 @@ func testRead(t *testing.T, opts TestOptions) { require.NoError(t, err) t.Run("simple", func(t *testing.T) { - output, err := backend.Read(ctx, res.Id) + output, err := backend.Read(ctx, consistency, res.Id) require.NoError(t, err) prototest.AssertDeepEqual(t, res, output, ignoreVersion) }) @@ -64,7 +64,7 @@ func testRead(t *testing.T, opts TestOptions) { id := clone(res.Id) id.Uid = "" - output, err := backend.Read(ctx, id) + output, err := backend.Read(ctx, consistency, id) require.NoError(t, err) prototest.AssertDeepEqual(t, res, output, ignoreVersion) }) @@ -73,7 +73,7 @@ func testRead(t *testing.T, opts TestOptions) { id := clone(res.Id) id.Name = "different" - _, err := backend.Read(ctx, id) + _, err := backend.Read(ctx, consistency, id) require.ErrorIs(t, err, storage.ErrNotFound) }) @@ -81,7 +81,7 @@ func testRead(t *testing.T, opts TestOptions) { id := clone(res.Id) id.Uid = "b" - _, err := backend.Read(ctx, id) + _, err := backend.Read(ctx, consistency, id) require.ErrorIs(t, err, storage.ErrNotFound) }) @@ -89,7 +89,7 @@ func testRead(t *testing.T, opts TestOptions) { id := clone(res.Id) id.Type = typeAv2 - _, err := backend.Read(ctx, id) + _, err := backend.Read(ctx, consistency, id) require.Error(t, err) var e storage.GroupVersionMismatchError @@ -200,7 +200,7 @@ func testCASDelete(t *testing.T, opts TestOptions) { require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) - _, err = backend.Read(ctx, res.Id) + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) require.ErrorIs(t, err, storage.ErrNotFound) }) @@ -222,7 +222,7 @@ func testCASDelete(t *testing.T, opts TestOptions) { id.Uid = "b" require.NoError(t, backend.DeleteCAS(ctx, id, res.Version)) - _, err = backend.Read(ctx, res.Id) + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) require.NoError(t, err) }) } @@ -427,13 +427,13 @@ func testListWatch(t *testing.T, opts TestOptions) { prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) // Check that Read is sequentially consistent with Watch. - readRes, err := backend.Read(ctx, event.Resource.Id) + readRes, err := backend.Read(ctx, storage.EventualConsistency, event.Resource.Id) require.NoError(t, err) prototest.AssertDeepEqual(t, event.Resource, readRes) } // Delete a random resource to check we get an event. - del, err := backend.Read(ctx, tc.results[rand.Intn(len(tc.results))].Id) + del, err := backend.Read(ctx, storage.EventualConsistency, tc.results[rand.Intn(len(tc.results))].Id) require.NoError(t, err) require.NoError(t, backend.DeleteCAS(ctx, del.Id, del.Version)) @@ -447,7 +447,7 @@ func testListWatch(t *testing.T, opts TestOptions) { prototest.AssertDeepEqual(t, del, event.Resource) // Check that Read is sequentially consistent with Watch. - _, err = backend.Read(ctx, del.Id) + _, err = backend.Read(ctx, storage.EventualConsistency, del.Id) require.ErrorIs(t, err, storage.ErrNotFound) }) } diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go index 6360db88b63..11bcaeebd61 100644 --- a/internal/storage/inmem/backend.go +++ b/internal/storage/inmem/backend.go @@ -36,12 +36,7 @@ type Backend struct { func (b *Backend) Run(ctx context.Context) { b.store.Run(ctx) } // Read implements the storage.Backend interface. -func (b *Backend) Read(_ context.Context, id *pbresource.ID) (*pbresource.Resource, error) { - return b.store.Read(id) -} - -// ReadConsistent implements the storage.Backend interface. -func (b *Backend) ReadConsistent(_ context.Context, id *pbresource.ID) (*pbresource.Resource, error) { +func (b *Backend) Read(_ context.Context, _ storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) { return b.store.Read(id) } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 17e4e6f4d79..6f8a2f8ac8f 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -31,6 +31,39 @@ var ( ErrInconsistent = errors.New("cannot satisfy consistency requirements") ) +// ReadConsistency is used to specify the required consistency guarantees for +// a read operation. +type ReadConsistency int + +const ( + // EventualConsistency provides a weak set of guarantees, but is much cheaper + // than using StrongConsistency and therefore should be treated as the default. + // + // It guarantees [sequential consistency] between reads. That is, a read will + // always return results that are as up-to-date as an earlier read, provided + // both happen on the same Consul server. But does not make any such guarantee + // about writes. + // + // In other words, reads won't necessarily reflect earlier writes, even when + // made against the same server. + // + // Operations that don't allow the caller to specify the consistency mode will + // hold the same guarantees as EventualConsistency, but check the method docs + // for caveats. + // + // [sequential consistency]: https://jepsen.io/consistency/models/sequential + EventualConsistency ReadConsistency = iota + + // StrongConsistency provides a very strong set of guarantees but is much more + // expensive, so should be used sparingly. + // + // It guarantees full [linearizability], such that a read will always return + // the most up-to-date version of a resource, without caveat. + // + // [linearizability]: https://jepsen.io/consistency/models/linearizable + StrongConsistency +) + // Backend provides the low-level storage substrate for resources. It can be // implemented using internal (i.e. Raft+MemDB) or external (e.g. DynamoDB) // storage systems. @@ -93,19 +126,8 @@ type Backend interface { // // # Consistency // - // Read makes no guarantees about consistency, and may return stale results. - // For stronger guarantees, use ReadConsistent. - Read(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) - - // ReadConsistent provides the same functionality as Read, but guarantees - // single-resource sequential consistency, typically by bypassing any caches - // and proxying the request directly to the underlying storage system. - // - // If a consistent read cannot be achieved (e.g. when interacting with a Raft - // follower, or quorum is lost) ErrInconsistent will be returned. - // - // Use ReadConsistent sparingly, and prefer Read when possible. - ReadConsistent(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) + // Read supports both EventualConsistency and StrongConsistency. + Read(ctx context.Context, consistency ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) // WriteCAS performs an atomic CAS (Compare-And-Swap) write of a resource based // on its version. The given version will be compared to what is stored, and if From ba94207472862c21c23481aa74b28bb26b87d9e5 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 17 Mar 2023 15:10:05 +0000 Subject: [PATCH 18/24] storage: add consistency parameter to List --- internal/storage/conformance/conformance.go | 2 +- internal/storage/inmem/backend.go | 2 +- internal/storage/storage.go | 9 +++++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index 72cbd92feb9..7784ed1f2e5 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -369,7 +369,7 @@ func testListWatch(t *testing.T, opts TestOptions) { for desc, tc := range testCases { t.Run(desc, func(t *testing.T) { - res, err := backend.List(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) + res, err := backend.List(ctx, storage.EventualConsistency, tc.resourceType, tc.tenancy, tc.namePrefix) require.NoError(t, err) prototest.AssertElementsMatch(t, res, tc.results, ignoreVersion) }) diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go index 11bcaeebd61..acc63adbbf7 100644 --- a/internal/storage/inmem/backend.go +++ b/internal/storage/inmem/backend.go @@ -57,7 +57,7 @@ func (b *Backend) DeleteCAS(_ context.Context, id *pbresource.ID, version string } // List implements the storage.Backend interface. -func (b *Backend) List(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { +func (b *Backend) List(_ context.Context, _ storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { return b.store.List(resType, tenancy, namePrefix) } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 6f8a2f8ac8f..221558cb576 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -198,8 +198,13 @@ type Backend interface { // // # Consistency // - // List makes no guarantees about consistency, and may return stale results. - List(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) + // Generally, List only supports EventualConsistency. However, for backward + // compatability with our v1 APIs, the Raft backend supports StrongConsistency + // for list operations. + // + // When the v1 APIs finally goes away, so will this consistency parameter, so + // it should not be depended on outside of the backward compatability layer. + List(ctx context.Context, consistency ReadConsistency, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) // WatchList watches resources of the given type, tenancy, and optionally // matching the given name prefix. Upsert events for the current state of the From 6e39fd5c41f5e557c40d4ca1d055820323f17d02 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 17 Mar 2023 15:17:26 +0000 Subject: [PATCH 19/24] storage: more correct consistency documentation --- internal/storage/storage.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 221558cb576..a87b5df2d4d 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -39,10 +39,9 @@ const ( // EventualConsistency provides a weak set of guarantees, but is much cheaper // than using StrongConsistency and therefore should be treated as the default. // - // It guarantees [sequential consistency] between reads. That is, a read will - // always return results that are as up-to-date as an earlier read, provided - // both happen on the same Consul server. But does not make any such guarantee - // about writes. + // It guarantees [monotonic reads]. That is, a read will always return results + // that are as up-to-date as an earlier read, provided both happen on the same + // Consul server. But does not make any such guarantee about writes. // // In other words, reads won't necessarily reflect earlier writes, even when // made against the same server. @@ -51,7 +50,7 @@ const ( // hold the same guarantees as EventualConsistency, but check the method docs // for caveats. // - // [sequential consistency]: https://jepsen.io/consistency/models/sequential + // [monotonic reads]: https://jepsen.io/consistency/models/monotonic-reads EventualConsistency ReadConsistency = iota // StrongConsistency provides a very strong set of guarantees but is much more @@ -218,18 +217,20 @@ type Backend interface { // write may not be received immediately), but it does guarantee that events // will be emitted in the correct order. // - // There's also a sequential consistency guarantee between Read and WatchList, + // There's also a guarantee of [monotonic reads] between Read and WatchList, // such that Read will never return data that is older than the most recent // event you received. Note: this guarantee holds at the (in-process) storage // backend level, only. Controllers and other users of the Resource Service API // must remain connected to the same Consul server process to avoid receiving // events about writes that they then cannot read. In other words, it is *not* - // linearizable: https://jepsen.io/consistency/models/sequential + // linearizable. // // There's a similar guarantee between WatchList and OwnerReferences, see the // OwnerReferences docs for more information. // // See List docs for details about Tenancy Wildcard and GroupVersion. + // + // [monotonic reads]: https://jepsen.io/consistency/models/monotonic-reads WatchList(ctx context.Context, resType UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (Watch, error) // OwnerReferences returns the IDs of resources owned by the resource with the @@ -237,7 +238,7 @@ type Backend interface { // // # Consistency // - // OwnerReferences may return stale results, but is sequentially consistent + // OwnerReferences may return stale results, but guarnantees [monotonic reads] // with events received from WatchList. In practice, this means that if you // learn that a resource has been deleted through a watch event, the results // you receive from OwnerReferences will contain all references that existed @@ -248,6 +249,8 @@ type Backend interface { // challenges), or by calling OwnerReferences after the expected window of // inconsistency (e.g. deferring cascading deletion, or doing a second pass // an hour later). + // + // [montonic reads]: https://jepsen.io/consistency/models/monotonic-reads OwnerReferences(ctx context.Context, id *pbresource.ID) ([]*pbresource.ID, error) } From 4580e51d7c172f8bca2a59ee7bbc6dea0a1992b1 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 17 Mar 2023 16:24:52 +0000 Subject: [PATCH 20/24] storage: fix integer alignment in inmem backend --- internal/storage/inmem/backend.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/storage/inmem/backend.go b/internal/storage/inmem/backend.go index acc63adbbf7..d14358aa153 100644 --- a/internal/storage/inmem/backend.go +++ b/internal/storage/inmem/backend.go @@ -27,8 +27,9 @@ func NewBackend() (*Backend, error) { // Backend is a purely in-memory storage backend implementation. type Backend struct { + vsn uint64 + store *Store - vsn uint64 } // Run until the given context is canceled. This method blocks, so should be From 0748073cf10f1f62c5e971020b3d7a0c442fc425 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 20 Mar 2023 11:26:11 +0000 Subject: [PATCH 21/24] storage: support eventual consistency in conformance tests --- internal/storage/conformance/conformance.go | 244 +++++++++++++------- internal/storage/inmem/backend_test.go | 1 + internal/storage/storage.go | 11 + proto/private/prototest/testing.go | 13 +- 4 files changed, 184 insertions(+), 85 deletions(-) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index 7784ed1f2e5..8c0e114bd08 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -15,23 +15,25 @@ import ( "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/prototest" + "github.com/hashicorp/consul/sdk/testutil/retry" ) type TestOptions struct { // NewBackend will be called to construct a storage.Backend to run the tests // against. NewBackend func(t *testing.T) storage.Backend + + // SupportsStronglyConsistentList indicates whether the given storage backend + // supports strongly consistent list operations. + SupportsStronglyConsistentList bool } // Test runs a suite of tests against a storage.Backend implementation to check // it correctly implements our required behaviours. -// -// Note: it currently checks for stronger consistency than we actually need. We -// will need to handle eventual consistency when we implement the Raft backend. func Test(t *testing.T, opts TestOptions) { require.NotNil(t, opts.NewBackend, "NewBackend method is required") - t.Run("Read", func(t *testing.T) { testRead(t, opts, storage.EventualConsistency) }) + t.Run("Read", func(t *testing.T) { testRead(t, opts) }) t.Run("CAS Write", func(t *testing.T) { testCASWrite(t, opts) }) t.Run("CAS Delete", func(t *testing.T) { testCASDelete(t, opts) }) t.Run("OwnerReferences", func(t *testing.T) { testOwnerReferences(t, opts) }) @@ -39,67 +41,107 @@ func Test(t *testing.T, opts TestOptions) { testListWatch(t, opts) } -func testRead(t *testing.T, opts TestOptions, consistency storage.ReadConsistency) { +func testRead(t *testing.T, opts TestOptions) { ctx := testContext(t) - backend := opts.NewBackend(t) - res := &pbresource.Resource{ - Id: &pbresource.ID{ - Type: typeAv1, - Tenancy: tenancyDefault, - Name: "web", - Uid: "a", - }, - } - _, err := backend.WriteCAS(ctx, res) - require.NoError(t, err) + for consistency, check := range map[storage.ReadConsistency]consistencyChecker{ + storage.EventualConsistency: eventually, + storage.StrongConsistency: immediately, + } { + t.Run(consistency.String(), func(t *testing.T) { + res := &pbresource.Resource{ + Id: &pbresource.ID{ + Type: typeAv1, + Tenancy: tenancyDefault, + Name: "web", + Uid: "a", + }, + } + + t.Run("simple", func(t *testing.T) { + backend := opts.NewBackend(t) - t.Run("simple", func(t *testing.T) { - output, err := backend.Read(ctx, consistency, res.Id) - require.NoError(t, err) - prototest.AssertDeepEqual(t, res, output, ignoreVersion) - }) + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) - t.Run("no uid", func(t *testing.T) { - id := clone(res.Id) - id.Uid = "" + check(t, func(t testingT) { + output, err := backend.Read(ctx, consistency, res.Id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) + }) + }) - output, err := backend.Read(ctx, consistency, id) - require.NoError(t, err) - prototest.AssertDeepEqual(t, res, output, ignoreVersion) - }) + t.Run("no uid", func(t *testing.T) { + backend := opts.NewBackend(t) - t.Run("different id", func(t *testing.T) { - id := clone(res.Id) - id.Name = "different" + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) - _, err := backend.Read(ctx, consistency, id) - require.ErrorIs(t, err, storage.ErrNotFound) - }) + id := clone(res.Id) + id.Uid = "" - t.Run("different uid", func(t *testing.T) { - id := clone(res.Id) - id.Uid = "b" + check(t, func(t testingT) { + output, err := backend.Read(ctx, consistency, id) + require.NoError(t, err) + prototest.AssertDeepEqual(t, res, output, ignoreVersion) + }) + }) - _, err := backend.Read(ctx, consistency, id) - require.ErrorIs(t, err, storage.ErrNotFound) - }) + t.Run("different id", func(t *testing.T) { + backend := opts.NewBackend(t) - t.Run("different GroupVersion", func(t *testing.T) { - id := clone(res.Id) - id.Type = typeAv2 + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) - _, err := backend.Read(ctx, consistency, id) - require.Error(t, err) + id := clone(res.Id) + id.Name = "different" + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("different uid", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Uid = "b" + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) + }) + + t.Run("different GroupVersion", func(t *testing.T) { + backend := opts.NewBackend(t) + + _, err := backend.WriteCAS(ctx, res) + require.NoError(t, err) + + id := clone(res.Id) + id.Type = typeAv2 + + check(t, func(t testingT) { + _, err := backend.Read(ctx, consistency, id) + require.Error(t, err) + + var e storage.GroupVersionMismatchError + if errors.As(err, &e) { + require.Equal(t, id.Type, e.RequestedType) + prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion) + } else { + t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err) + } + }) + }) + }) + } - var e storage.GroupVersionMismatchError - if errors.As(err, &e) { - require.Equal(t, id.Type, e.RequestedType) - prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion) - } else { - t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err) - } - }) } func testCASWrite(t *testing.T, opts TestOptions) { @@ -200,8 +242,10 @@ func testCASDelete(t *testing.T, opts TestOptions) { require.NoError(t, backend.DeleteCAS(ctx, res.Id, res.Version)) - _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) - require.ErrorIs(t, err, storage.ErrNotFound) + eventually(t, func(t testingT) { + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) + require.ErrorIs(t, err, storage.ErrNotFound) + }) }) t.Run("uid must match", func(t *testing.T) { @@ -222,8 +266,10 @@ func testCASDelete(t *testing.T, opts TestOptions) { id.Uid = "b" require.NoError(t, backend.DeleteCAS(ctx, id, res.Version)) - _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) - require.NoError(t, err) + eventually(t, func(t testingT) { + _, err = backend.Read(ctx, storage.EventualConsistency, res.Id) + require.NoError(t, err) + }) }) } @@ -359,19 +405,32 @@ func testListWatch(t *testing.T, opts TestOptions) { } t.Run("List", func(t *testing.T) { - backend := opts.NewBackend(t) ctx := testContext(t) - for _, r := range seedData { - _, err := backend.WriteCAS(ctx, r) - require.NoError(t, err) + consistencyModes := map[storage.ReadConsistency]consistencyChecker{ + storage.EventualConsistency: eventually, + } + if opts.SupportsStronglyConsistentList { + consistencyModes[storage.StrongConsistency] = immediately } - for desc, tc := range testCases { - t.Run(desc, func(t *testing.T) { - res, err := backend.List(ctx, storage.EventualConsistency, tc.resourceType, tc.tenancy, tc.namePrefix) - require.NoError(t, err) - prototest.AssertElementsMatch(t, res, tc.results, ignoreVersion) + for consistency, check := range consistencyModes { + t.Run(consistency.String(), func(t *testing.T) { + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + backend := opts.NewBackend(t) + for _, r := range seedData { + _, err := backend.WriteCAS(ctx, r) + require.NoError(t, err) + } + + check(t, func(t testingT) { + res, err := backend.List(ctx, consistency, tc.resourceType, tc.tenancy, tc.namePrefix) + require.NoError(t, err) + prototest.AssertElementsMatch(t, res, tc.results, ignoreVersion) + }) + }) + } }) } }) @@ -426,7 +485,7 @@ func testListWatch(t *testing.T, opts TestOptions) { require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, event.Operation) prototest.AssertContainsElement(t, tc.results, event.Resource, ignoreVersion) - // Check that Read is sequentially consistent with Watch. + // Check that Read implements "monotonic reads" with Watch. readRes, err := backend.Read(ctx, storage.EventualConsistency, event.Resource.Id) require.NoError(t, err) prototest.AssertDeepEqual(t, event.Resource, readRes) @@ -446,7 +505,7 @@ func testListWatch(t *testing.T, opts TestOptions) { require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, event.Operation) prototest.AssertDeepEqual(t, del, event.Resource) - // Check that Read is sequentially consistent with Watch. + // Check that Read implements "monotonic reads" with Watch. _, err = backend.Read(ctx, storage.EventualConsistency, del.Id) require.ErrorIs(t, err, storage.ErrNotFound) }) @@ -490,33 +549,41 @@ func testOwnerReferences(t *testing.T, opts TestOptions) { }) require.NoError(t, err) - refs, err := backend.OwnerReferences(ctx, owner.Id) - require.NoError(t, err) - prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) t.Run("references are anchored to a specific uid", func(t *testing.T) { id := clone(owner.Id) id.Uid = "different" - refs, err := backend.OwnerReferences(ctx, id) - require.NoError(t, err) - require.Empty(t, refs) + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, id) + require.NoError(t, err) + require.Empty(t, refs) + }) }) t.Run("deleting the owner doesn't remove the references", func(t *testing.T) { require.NoError(t, backend.DeleteCAS(ctx, owner.Id, owner.Version)) - refs, err = backend.OwnerReferences(ctx, owner.Id) - require.NoError(t, err) - prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id, r2.Id}) + }) }) t.Run("deleting the owned resource removes its reference", func(t *testing.T) { require.NoError(t, backend.DeleteCAS(ctx, r2.Id, r2.Version)) - refs, err = backend.OwnerReferences(ctx, owner.Id) - require.NoError(t, err) - prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id}) + eventually(t, func(t testingT) { + refs, err := backend.OwnerReferences(ctx, owner.Id) + require.NoError(t, err) + prototest.AssertElementsMatch(t, refs, []*pbresource.ID{r1.Id}) + }) }) } @@ -589,3 +656,20 @@ func testContext(t *testing.T) context.Context { } func clone[T proto.Message](v T) T { return proto.Clone(v).(T) } + +type testingT interface { + require.TestingT + prototest.TestingT +} + +type consistencyChecker func(t *testing.T, fn func(testingT)) + +func eventually(t *testing.T, fn func(testingT)) { + t.Helper() + retry.Run(t, func(r *retry.R) { fn(r) }) +} + +func immediately(t *testing.T, fn func(testingT)) { + t.Helper() + fn(t) +} diff --git a/internal/storage/inmem/backend_test.go b/internal/storage/inmem/backend_test.go index 1620ec29953..fcfeb896c9c 100644 --- a/internal/storage/inmem/backend_test.go +++ b/internal/storage/inmem/backend_test.go @@ -23,5 +23,6 @@ func TestBackend_Conformance(t *testing.T) { return backend }, + SupportsStronglyConsistentList: true, }) } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index a87b5df2d4d..3ae33d6d42f 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -63,6 +63,17 @@ const ( StrongConsistency ) +// String implements the fmt.Stringer interface. +func (c ReadConsistency) String() string { + switch c { + case EventualConsistency: + return "Eventual Consistency" + case StrongConsistency: + return "Strong Consistency" + } + panic(fmt.Sprintf("unknown ReadConsistency (%d)", c)) +} + // Backend provides the low-level storage substrate for resources. It can be // implemented using internal (i.e. Raft+MemDB) or external (e.g. DynamoDB) // storage systems. diff --git a/proto/private/prototest/testing.go b/proto/private/prototest/testing.go index 33dbf10bf11..1baafa2c611 100644 --- a/proto/private/prototest/testing.go +++ b/proto/private/prototest/testing.go @@ -1,13 +1,16 @@ package prototest import ( - "testing" - "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" ) -func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { +type TestingT interface { + Helper() + Fatalf(string, ...any) +} + +func AssertDeepEqual(t TestingT, x, y interface{}, opts ...cmp.Option) { t.Helper() opts = append(opts, protocmp.Transform()) @@ -24,7 +27,7 @@ func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) { // // prototest.AssertElementsMatch(t, [1, 3, 2, 3], [1, 3, 3, 2]) func AssertElementsMatch[V any]( - t testing.TB, listX, listY []V, opts ...cmp.Option, + t TestingT, listX, listY []V, opts ...cmp.Option, ) { t.Helper() @@ -74,7 +77,7 @@ func AssertElementsMatch[V any]( } } -func AssertContainsElement[V any](t testing.TB, list []V, element V, opts ...cmp.Option) { +func AssertContainsElement[V any](t TestingT, list []V, element V, opts ...cmp.Option) { t.Helper() opts = append(opts, protocmp.Transform()) From 0a49eff9b754a395985b2ddc93fe1e6536be0df3 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 20 Mar 2023 11:29:53 +0000 Subject: [PATCH 22/24] storage: correct eventLock comment --- internal/storage/inmem/store.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index 779f36f22cb..ea448eb0b5a 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -26,11 +26,11 @@ type Store struct { // concurrent writers. // // We cannot rely on MemDB's write lock for this, because events must be - // published *after* the transaction is committed to provide sequential - // consistency between Watch and Read calls. In other words, if we were to - // publish an event before the transaction was committed, there would be a - // small window of time where a watcher (e.g. controller) could try to Read - // the resource and not get the version they were notified about. + // published *after* the transaction is committed to provide monotonic reads + // between Watch and Read calls. In other words, if we were to publish an event + // before the transaction was committed, there would be a small window of time + // where a watcher (e.g. controller) could try to Read the resource and not get + // the version they were notified about. // // Without this lock, it would be possible to publish events out-of-order. eventLock sync.Mutex From 8c753338e2961af333433f6bf7fac4692ae9518d Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Mon, 20 Mar 2023 11:38:45 +0000 Subject: [PATCH 23/24] storage: rearrange inmem store files --- internal/storage/inmem/event_index.go | 32 ++++++ .../storage/inmem/{query.go => schema.go} | 47 ++++++++ internal/storage/inmem/store.go | 106 +++--------------- 3 files changed, 97 insertions(+), 88 deletions(-) create mode 100644 internal/storage/inmem/event_index.go rename internal/storage/inmem/{query.go => schema.go} (83%) diff --git a/internal/storage/inmem/event_index.go b/internal/storage/inmem/event_index.go new file mode 100644 index 00000000000..06d65b67c4e --- /dev/null +++ b/internal/storage/inmem/event_index.go @@ -0,0 +1,32 @@ +package inmem + +import "github.com/hashicorp/go-memdb" + +type meta struct { + Key string + Value any +} + +func incrementEventIndex(tx *memdb.Txn) (uint64, error) { + idx, err := currentEventIndex(tx) + if err != nil { + return 0, err + } + + idx++ + if err := tx.Insert(tableNameMetadata, meta{Key: metaKeyEventIndex, Value: idx}); err != nil { + return 0, nil + } + return idx, nil +} + +func currentEventIndex(tx *memdb.Txn) (uint64, error) { + v, err := tx.First(tableNameMetadata, indexNameID, metaKeyEventIndex) + if err != nil { + return 0, err + } + if v == nil { + return 0, nil + } + return v.(meta).Value.(uint64), nil +} diff --git a/internal/storage/inmem/query.go b/internal/storage/inmem/schema.go similarity index 83% rename from internal/storage/inmem/query.go rename to internal/storage/inmem/schema.go index 0396947fc1c..2f63fb8ce6f 100644 --- a/internal/storage/inmem/query.go +++ b/internal/storage/inmem/schema.go @@ -5,10 +5,57 @@ import ( "fmt" "strings" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" ) +const ( + tableNameMetadata = "metadata" + tableNameResources = "resources" + + indexNameID = "id" + indexNameOwner = "owner" + + metaKeyEventIndex = "index" +) + +func newDB() (*memdb.MemDB, error) { + return memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + tableNameMetadata: { + Name: tableNameMetadata, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Key"}, + }, + }, + }, + tableNameResources: { + Name: tableNameResources, + Indexes: map[string]*memdb.IndexSchema{ + indexNameID: { + Name: indexNameID, + AllowMissing: false, + Unique: true, + Indexer: idIndexer{}, + }, + indexNameOwner: { + Name: indexNameOwner, + AllowMissing: true, + Unique: false, + Indexer: ownerIndexer{}, + }, + }, + }, + }, + }) +} + // indexSeparator delimits the segments of our radix tree keys. const indexSeparator = "\x00" diff --git a/internal/storage/inmem/store.go b/internal/storage/inmem/store.go index ea448eb0b5a..fc769a6e07e 100644 --- a/internal/storage/inmem/store.go +++ b/internal/storage/inmem/store.go @@ -40,38 +40,7 @@ type Store struct { // // You must call Run before using the store. func NewStore() (*Store, error) { - db, err := memdb.NewMemDB(&memdb.DBSchema{ - Tables: map[string]*memdb.TableSchema{ - tableNameMetadata: { - Name: tableNameMetadata, - Indexes: map[string]*memdb.IndexSchema{ - indexNameID: { - Name: indexNameID, - AllowMissing: false, - Unique: true, - Indexer: &memdb.StringFieldIndex{Field: "Key"}, - }, - }, - }, - tableNameResources: { - Name: tableNameResources, - Indexes: map[string]*memdb.IndexSchema{ - indexNameID: { - Name: indexNameID, - AllowMissing: false, - Unique: true, - Indexer: idIndexer{}, - }, - indexNameOwner: { - Name: indexNameOwner, - AllowMissing: true, - Unique: false, - Indexer: ownerIndexer{}, - }, - }, - }, - }, - }) + db, err := newDB() if err != nil { return nil, err } @@ -229,6 +198,23 @@ func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, nameP return listTxn(tx, query{typ, ten, namePrefix}) } +func listTxn(tx *memdb.Txn, q query) ([]*pbresource.Resource, error) { + iter, err := tx.Get(tableNameResources, indexNameID+"_prefix", q) + if err != nil { + return nil, err + } + + list := make([]*pbresource.Resource, 0) + for v := iter.Next(); v != nil; v = iter.Next() { + res := v.(*pbresource.Resource) + + if q.matches(res) { + list = append(list, res) + } + } + return list, nil +} + // WatchList watches resources of the given type, tenancy, and optionally // matching the given name prefix. // @@ -286,59 +272,3 @@ func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) { } return refs, nil } - -const ( - tableNameMetadata = "metadata" - tableNameResources = "resources" - - indexNameID = "id" - indexNameOwner = "owner" - - metaKeyEventIndex = "index" -) - -func listTxn(tx *memdb.Txn, q query) ([]*pbresource.Resource, error) { - iter, err := tx.Get(tableNameResources, indexNameID+"_prefix", q) - if err != nil { - return nil, err - } - - list := make([]*pbresource.Resource, 0) - for v := iter.Next(); v != nil; v = iter.Next() { - res := v.(*pbresource.Resource) - - if q.matches(res) { - list = append(list, res) - } - } - return list, nil -} - -type meta struct { - Key string - Value any -} - -func incrementEventIndex(tx *memdb.Txn) (uint64, error) { - idx, err := currentEventIndex(tx) - if err != nil { - return 0, err - } - - idx++ - if err := tx.Insert(tableNameMetadata, meta{Key: metaKeyEventIndex, Value: idx}); err != nil { - return 0, nil - } - return idx, nil -} - -func currentEventIndex(tx *memdb.Txn) (uint64, error) { - v, err := tx.First(tableNameMetadata, indexNameID, metaKeyEventIndex) - if err != nil { - return 0, err - } - if v == nil { - return 0, nil - } - return v.(meta).Value.(uint64), nil -} From 8bc4a804bc364fcae8cfc7731211051a56babddd Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Tue, 21 Mar 2023 14:57:44 +0000 Subject: [PATCH 24/24] storage: fix bug where watches could emit duplicate events --- internal/storage/inmem/event_index.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/storage/inmem/event_index.go b/internal/storage/inmem/event_index.go index 06d65b67c4e..58aa05e1d2c 100644 --- a/internal/storage/inmem/event_index.go +++ b/internal/storage/inmem/event_index.go @@ -26,7 +26,8 @@ func currentEventIndex(tx *memdb.Txn) (uint64, error) { return 0, err } if v == nil { - return 0, nil + // 0 and 1 index are reserved for special use in the stream package. + return 2, nil } return v.(meta).Value.(uint64), nil }