diff --git a/.github/scripts/test-info.mjs b/.github/scripts/test-info.mjs index 9a659db6f6..18eac1c187 100644 --- a/.github/scripts/test-info.mjs +++ b/.github/scripts/test-info.mjs @@ -795,6 +795,23 @@ const components = { requiredSecrets: ['AzureSqlServerConnectionString'], sourcePkg: ['state/sqlserver', 'common/component/sql'], }, + 'state.sqlserver.v2': { + conformance: true, + certification: true, + conformanceSetup: 'docker-compose.sh sqlserver', + requiredSecrets: ['AzureSqlServerConnectionString'], + sourcePkg: ['state/sqlserver/v2', 'common/component/sql'], + }, + 'state.sqlserver.docker': { + conformance: true, + conformanceSetup: 'docker-compose.sh sqlserver', + sourcePkg: ['state/sqlserver', 'common/component/sql'], + }, + 'state.sqlserver.v2.docker': { + conformance: true, + conformanceSetup: 'docker-compose.sh sqlserver', + sourcePkg: ['state/sqlserver/v2', 'common/component/sql'], + }, // 'state.gcp.firestore.docker': { // conformance: true, // requireDocker: true, diff --git a/common/proto/state/sqlserver/test.pb.go b/common/proto/state/sqlserver/test.pb.go new file mode 100644 index 0000000000..1c936adb57 --- /dev/null +++ b/common/proto/state/sqlserver/test.pb.go @@ -0,0 +1,162 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v4.25.4 +// source: test.proto + +package sqlserver + +import ( + "reflect" + "sync" + + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/runtime/protoimpl" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TestEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + EventId int32 `protobuf:"varint,1,opt,name=eventId,proto3" json:"eventId,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` +} + +func (x *TestEvent) Reset() { + *x = TestEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestEvent) ProtoMessage() {} + +func (x *TestEvent) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestEvent.ProtoReflect.Descriptor instead. +func (*TestEvent) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{0} +} + +func (x *TestEvent) GetEventId() int32 { + if x != nil { + return x.EventId + } + return 0 +} + +func (x *TestEvent) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + +var File_test_proto protoreflect.FileDescriptor + +var file_test_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x5f, 0x0a, + 0x09, 0x54, 0x65, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x41, + 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x61, 0x70, + 0x72, 0x2f, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x2d, 0x63, 0x6f, 0x6e, + 0x74, 0x72, 0x69, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x73, 0x71, 0x6c, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_proto_rawDescOnce sync.Once + file_test_proto_rawDescData = file_test_proto_rawDesc +) + +func file_test_proto_rawDescGZIP() []byte { + file_test_proto_rawDescOnce.Do(func() { + file_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_proto_rawDescData) + }) + return file_test_proto_rawDescData +} + +var file_test_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_test_proto_goTypes = []interface{}{ + (*TestEvent)(nil), // 0: TestEvent + (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp +} +var file_test_proto_depIdxs = []int32{ + 1, // 0: TestEvent.timestamp:type_name -> google.protobuf.Timestamp + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_test_proto_init() } +func file_test_proto_init() { + if File_test_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_test_proto_goTypes, + DependencyIndexes: file_test_proto_depIdxs, + MessageInfos: file_test_proto_msgTypes, + }.Build() + File_test_proto = out.File + file_test_proto_rawDesc = nil + file_test_proto_goTypes = nil + file_test_proto_depIdxs = nil +} diff --git a/common/proto/state/sqlserver/test.proto b/common/proto/state/sqlserver/test.proto new file mode 100644 index 0000000000..7c84802598 --- /dev/null +++ b/common/proto/state/sqlserver/test.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +option go_package = "github.com/dapr/components-contrib/common/proto/state/sqlserver"; + +import "google/protobuf/timestamp.proto"; + +message TestEvent { + int32 eventId = 1; + google.protobuf.Timestamp timestamp = 2; +} \ No newline at end of file diff --git a/state/sqlserver/v2/metadata.go b/state/sqlserver/v2/metadata.go new file mode 100644 index 0000000000..eeb2fdda4c --- /dev/null +++ b/state/sqlserver/v2/metadata.go @@ -0,0 +1,210 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlserver + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + sqlserverAuth "github.com/dapr/components-contrib/common/authentication/sqlserver" + "github.com/dapr/kit/metadata" + "github.com/dapr/kit/ptr" +) + +const ( + keyColumnName = "Key" + rowVersionColumnName = "RowVersion" + + defaultKeyLength = 200 + defaultTable = "state" + defaultMetaTable = "dapr_metadata" + defaultCleanupInterval = time.Hour +) + +type sqlServerMetadata struct { + sqlserverAuth.SQLServerAuthMetadata `mapstructure:",squash"` + + TableName string + MetadataTableName string + KeyType string + KeyLength int + IndexedProperties string + CleanupInterval *time.Duration `mapstructure:"cleanupInterval" mapstructurealiases:"cleanupIntervalInSeconds"` + + // Internal properties + keyTypeParsed KeyType + keyLengthParsed int + indexedPropertiesParsed []IndexedProperty +} + +func newMetadata() sqlServerMetadata { + return sqlServerMetadata{ + TableName: defaultTable, + KeyLength: defaultKeyLength, + MetadataTableName: defaultMetaTable, + CleanupInterval: ptr.Of(defaultCleanupInterval), + } +} + +func (m *sqlServerMetadata) Parse(meta map[string]string) error { + // Reset first + m.SQLServerAuthMetadata.Reset() + + // Decode the metadata + err := metadata.DecodeMetadata(meta, &m) + if err != nil { + return err + } + + // Validate and parse the auth metadata + err = m.SQLServerAuthMetadata.Validate(meta) + if err != nil { + return err + } + + // Validate and sanitize more values + if !sqlserverAuth.IsValidSQLName(m.TableName) { + return errors.New("invalid table name, accepted characters are (A-Z, a-z, 0-9, _)") + } + if !sqlserverAuth.IsValidSQLName(m.MetadataTableName) { + return errors.New("invalid metadata table name, accepted characters are (A-Z, a-z, 0-9, _)") + } + + err = m.setKeyType() + if err != nil { + return err + } + err = m.setIndexedProperties() + if err != nil { + return err + } + + // Cleanup interval + if m.CleanupInterval != nil { + // Non-positive value from meta means disable auto cleanup. + if *m.CleanupInterval <= 0 { + val, _ := metadata.GetMetadataProperty(meta, "cleanupInterval", "cleanupIntervalInSeconds") + if val == "" { + // Unfortunately the mapstructure decoder decodes an empty string to 0, a missing key would be nil however + m.CleanupInterval = ptr.Of(defaultCleanupInterval) + } else { + m.CleanupInterval = nil + } + } + } + + return nil +} + +// Validates and returns the key type. +func (m *sqlServerMetadata) setKeyType() error { + if m.KeyType != "" { + kt, err := KeyTypeFromString(m.KeyType) + if err != nil { + return err + } + + m.keyTypeParsed = kt + } else { + m.keyTypeParsed = StringKeyType + } + + if m.keyTypeParsed != StringKeyType { + return nil + } + + if m.KeyLength <= 0 { + return fmt.Errorf("invalid key length value of %d", m.KeyLength) + } else { + m.keyLengthParsed = m.KeyLength + } + + return nil +} + +// Sets the validated index properties. +func (m *sqlServerMetadata) setIndexedProperties() error { + if m.IndexedProperties == "" { + return nil + } + + var indexedProperties []IndexedProperty + err := json.Unmarshal([]byte(m.IndexedProperties), &indexedProperties) + if err != nil { + return err + } + + err = m.validateIndexedProperties(indexedProperties) + if err != nil { + return err + } + + m.indexedPropertiesParsed = indexedProperties + + return nil +} + +// Validates that all the mandator index properties are supplied and that the +// values are valid. +func (m *sqlServerMetadata) validateIndexedProperties(indexedProperties []IndexedProperty) error { + for _, p := range indexedProperties { + if p.ColumnName == "" { + return errors.New("indexed property column cannot be empty") + } + + if p.Property == "" { + return errors.New("indexed property name cannot be empty") + } + + if p.Type == "" { + return errors.New("indexed property type cannot be empty") + } + + if !sqlserverAuth.IsValidSQLName(p.ColumnName) { + return errors.New("invalid indexed property column name, accepted characters are (A-Z, a-z, 0-9, _)") + } + + if !isValidIndexedPropertyName(p.Property) { + return errors.New("invalid indexed property name, accepted characters are (A-Z, a-z, 0-9, _, ., [, ])") + } + + if !isValidIndexedPropertyType(p.Type) { + return errors.New("invalid indexed property type, accepted characters are (A-Z, a-z, 0-9, _, (, ))") + } + } + + return nil +} + +func isValidIndexedPropertyName(s string) bool { + for _, c := range s { + if !(sqlserverAuth.IsLetterOrNumber(c) || (c == '_') || (c == '.') || (c == '[') || (c == ']')) { + return false + } + } + + return true +} + +func isValidIndexedPropertyType(s string) bool { + for _, c := range s { + if !(sqlserverAuth.IsLetterOrNumber(c) || (c == '(') || (c == ')')) { + return false + } + } + + return true +} diff --git a/state/sqlserver/v2/metadata.yaml b/state/sqlserver/v2/metadata.yaml new file mode 100644 index 0000000000..e0a79e03a1 --- /dev/null +++ b/state/sqlserver/v2/metadata.yaml @@ -0,0 +1,105 @@ +# yaml-language-server: $schema=../../component-metadata-schema.json +schemaVersion: "v1" +type: "state" +name: "sqlserver" +version: "v2" +status: "stable" +title: "SQL Server" +description: "Microsoft SQL Server and Azure SQL" +urls: + - title: "Reference" + url: "https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-sqlserver/" +capabilities: + # If actorStateStore is present, the metadata key actorStateStore can be used + - "actorStateStore" + - "crud" + - "transactional" + - "etag" + - "ttl" +authenticationProfiles: + - title: "Connection string" + description: | + Authenticates using a connection string + metadata: + - name: connectionString + required: true + sensitive: true + description: | + The connection string used to connect. + If the connection string contains the database, it must already exist. Otherwise, if the database is omitted, a default database named "Dapr" is created. + example: | + "Server=myServerName\myInstanceName;Database=myDataBase;User Id=myUsername;Password=myPassword;" +builtinAuthenticationProfiles: + - name: "azuread" + metadata: + - name: useAzureAD + required: true + type: bool + description: | + Must be set to `true` to enable the component to retrieve access tokens from Azure AD. + This authentication method only works with Azure SQL databases. + example: "true" + - name: connectionString + required: true + sensitive: true + description: | + The connection string or URL of the Azure SQL database, without credentials. + If the connection string contains the database, it must already exist. Otherwise, if the database is omitted, a default database named "Dapr" is created. + example: | + "sqlserver://myServerName.database.windows.net:1433?database=myDataBase" +metadata: + - name: tableName + description: | + The name of the table to use. Alpha-numeric with underscores. + example: | + "table_name" + default: | + "state" + - name: metadataTableName + description: | + Name of the table Dapr uses to store metadata properties. + example: | + "dapr_metadata" + default: | + "dapr_metadata" + - name: schemaName + description: | + The schema to use. + example: | + "dapr" + default: | + "dbo" + - name: keyType + description: | + The type of key used + allowedValues: + - "string" + - "uuid" + - "integer" + default: | + "string" + example: | + "string" + - name: keyLength + type: number + description: | + The max length of key. Ignored if "keyType" is not `string`. + example: | + 200 + default: | + 200 + - name: indexedProperties + description: | + List of indexed properties, as a string containing a JSON document. + This will apply only to String data + example: | + '[{"column": "transactionid", "property": "id", "type": "int"}, {"column": "customerid", "property": "customer", "type": "nvarchar(100)"}]' + - name: cleanupInterval + type: number + description: | + Interval, in seconds, to clean up rows with an expired TTL. Default: 3600 (i.e. 1 hour). + Setting this to values <=0 disables the periodic cleanup. + default: | + "3600" + example: | + "1800", "-1" diff --git a/state/sqlserver/v2/migration.go b/state/sqlserver/v2/migration.go new file mode 100644 index 0000000000..0bc1925f4c --- /dev/null +++ b/state/sqlserver/v2/migration.go @@ -0,0 +1,351 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlserver + +import ( + "context" + "database/sql" + "fmt" +) + +type migrator interface { + executeMigrations(context.Context) (migrationResult, error) +} + +type migration struct { + metadata *sqlServerMetadata +} + +type migrationResult struct { + itemRefTableTypeName string + upsertProcName string + upsertProcFullName string + pkColumnType string + getCommand string + deleteWithETagCommand string + deleteWithoutETagCommand string +} + +func newMigration(metadata *sqlServerMetadata) migrator { + return &migration{ + metadata: metadata, + } +} + +func (m *migration) newMigrationResult() migrationResult { + r := migrationResult{ + itemRefTableTypeName: fmt.Sprintf("[%s].%s_Table", m.metadata.SchemaName, m.metadata.TableName), + upsertProcName: "sp_Upsert_v5_" + m.metadata.TableName, + getCommand: fmt.Sprintf("SELECT [Data], [BinaryData], [isBinary], [RowVersion], [ExpireDate] FROM [%s].[%s] WHERE [Key] = @Key AND ([ExpireDate] IS NULL OR [ExpireDate] > GETDATE())", m.metadata.SchemaName, m.metadata.TableName), + deleteWithETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key AND [RowVersion]=@RowVersion`, m.metadata.SchemaName, m.metadata.TableName), + deleteWithoutETagCommand: fmt.Sprintf(`DELETE [%s].[%s] WHERE [Key]=@Key`, m.metadata.SchemaName, m.metadata.TableName), + } + + r.upsertProcFullName = fmt.Sprintf("[%s].%s", m.metadata.SchemaName, r.upsertProcName) + + //nolint:exhaustive + switch m.metadata.keyTypeParsed { + case StringKeyType: + r.pkColumnType = fmt.Sprintf("NVARCHAR(%d)", m.metadata.keyLengthParsed) + + case UUIDKeyType: + r.pkColumnType = "uniqueidentifier" + + case IntegerKeyType: + r.pkColumnType = "int" + } + + return r +} + +/* #nosec. */ +func (m *migration) executeMigrations(ctx context.Context) (migrationResult, error) { + r := m.newMigrationResult() + + conn, hasDatabase, err := m.metadata.GetConnector(false) + if err != nil { + return r, err + } + db := sql.OpenDB(conn) + + // If the user provides a database in the connection string do not attempt + // to create the database. This work as the component did before adding the + // support to create the db. + if hasDatabase { + // Schedule close of connection + defer db.Close() + } else { + err = m.ensureDatabaseExists(ctx, db) + if err != nil { + return r, fmt.Errorf("failed to create database: %w", err) + } + + // Close the existing connection + db.Close() + + // Re connect with a database-specific connection + conn, _, err = m.metadata.GetConnector(true) + if err != nil { + return r, err + } + db = sql.OpenDB(conn) + + // Schedule close of new connection + defer db.Close() + } + + err = m.ensureSchemaExists(ctx, db) + if err != nil { + return r, fmt.Errorf("failed to create db schema: %w", err) + } + + err = m.ensureTableExists(ctx, db, r) + if err != nil { + return r, fmt.Errorf("failed to create db table: %w", err) + } + + err = m.ensureStoredProcedureExists(ctx, db, r) + if err != nil { + return r, fmt.Errorf("failed to create stored procedures: %w", err) + } + + for _, ix := range m.metadata.indexedPropertiesParsed { + err = m.ensureIndexedPropertyExists(ctx, db, ix) + if err != nil { + return r, err + } + } + + return r, nil +} + +func runCommand(ctx context.Context, db *sql.DB, tsql string) error { + if _, err := db.ExecContext(ctx, tsql); err != nil { + return err + } + + return nil +} + +/* #nosec. */ +func (m *migration) ensureIndexedPropertyExists(ctx context.Context, db *sql.DB, ix IndexedProperty) error { + indexName := "IX_" + ix.ColumnName + + tsql := fmt.Sprintf(` + IF (NOT EXISTS(SELECT object_id + FROM sys.indexes + WHERE object_id = OBJECT_ID('[%s].%s') + AND name='%s')) + CREATE INDEX %s ON [%s].[%s]([%s])`, + m.metadata.SchemaName, + m.metadata.TableName, + indexName, + indexName, + m.metadata.SchemaName, + m.metadata.TableName, + ix.ColumnName) + + return runCommand(ctx, db, tsql) +} + +/* #nosec. */ +func (m *migration) ensureDatabaseExists(ctx context.Context, db *sql.DB) error { + tsql := fmt.Sprintf(` +IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = N'%s') + CREATE DATABASE [%s]`, + m.metadata.DatabaseName, m.metadata.DatabaseName) + + return runCommand(ctx, db, tsql) +} + +/* #nosec. */ +func (m *migration) ensureSchemaExists(ctx context.Context, db *sql.DB) error { + tsql := fmt.Sprintf(` + IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = N'%s') + EXEC('CREATE SCHEMA [%s]')`, + m.metadata.SchemaName, m.metadata.SchemaName) + + return runCommand(ctx, db, tsql) +} + +/* #nosec. */ +func (m *migration) ensureTableExists(ctx context.Context, db *sql.DB, r migrationResult) error { + tsql := fmt.Sprintf(` + IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s') + CREATE TABLE [%s].[%s] ( + [Key] %s CONSTRAINT PK_%s PRIMARY KEY, + [Data] NVARCHAR(MAX) NULL, + [BinaryData] VARBINARY(MAX) NULL, + [isBinary] BIT NOT NULL DEFAULT(0), + [InsertDate] DateTime2 NOT NULL DEFAULT(GETDATE()), + [UpdateDate] DateTime2 NULL, + [ExpireDate] DateTime2 NULL,`, + m.metadata.SchemaName, m.metadata.TableName, m.metadata.SchemaName, m.metadata.TableName, r.pkColumnType, m.metadata.TableName) + + for _, prop := range m.metadata.indexedPropertiesParsed { + if prop.Type != "" { + tsql += fmt.Sprintf("\n [%s] AS CONVERT(%s, JSON_VALUE(Data, '$.%s')) PERSISTED,", prop.ColumnName, prop.Type, prop.Property) + } else { + tsql += fmt.Sprintf("\n [%s] AS JSON_VALUE(Data, '$.%s') PERSISTED,", prop.ColumnName, prop.Property) + } + } + + tsql += ` + [RowVersion] ROWVERSION NOT NULL) + ` + + if err := runCommand(ctx, db, tsql); err != nil { + return err + } + + // Create metadata Table + tsql = fmt.Sprintf(` + IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%[1]s' AND TABLE_NAME = '%[2]s') + CREATE TABLE [%[1]s].[%[2]s] ( + [Key] %[3]s CONSTRAINT PK_%[4]s PRIMARY KEY, + [Value] NVARCHAR(MAX) NOT NULL + )`, m.metadata.SchemaName, m.metadata.MetadataTableName, r.pkColumnType, m.metadata.MetadataTableName) + if err := runCommand(ctx, db, tsql); err != nil { + return err + } + + return nil +} + +/* #nosec. */ +func (m *migration) ensureTypeExists(ctx context.Context, db *sql.DB, mr migrationResult) error { + tsql := fmt.Sprintf(` + IF type_id('[%s].%s_Table') IS NULL + CREATE TYPE [%s].%s_Table AS TABLE + ( + [Key] %s NOT NULL, + [RowVersion] BINARY(8) + ) + `, m.metadata.SchemaName, m.metadata.TableName, m.metadata.SchemaName, m.metadata.TableName, mr.pkColumnType) + + return runCommand(ctx, db, tsql) +} + +func (m *migration) ensureStoredProcedureExists(ctx context.Context, db *sql.DB, mr migrationResult) error { + err := m.ensureTypeExists(ctx, db, mr) + if err != nil { + return err + } + + err = m.ensureUpsertStoredProcedureExists(ctx, db, mr) + if err != nil { + return err + } + + return nil +} + +/* #nosec. */ +func (m *migration) createStoredProcedureIfNotExists(ctx context.Context, db *sql.DB, name string, escapedDefinition string) error { + tsql := fmt.Sprintf(` + IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[%s].[%s]') AND type in (N'P', N'PC')) + BEGIN + execute ('%s') + END`, + m.metadata.SchemaName, + name, + escapedDefinition) + + return runCommand(ctx, db, tsql) +} + +/* #nosec. */ +//nolint:dupword +func (m *migration) ensureUpsertStoredProcedureExists(ctx context.Context, db *sql.DB, mr migrationResult) error { + tsql := fmt.Sprintf(` + CREATE PROCEDURE %[1]s ( + @Key %[2]s, + @Data NVARCHAR(MAX), + @BinaryData VARBINARY(MAX), + @isBinary BIT, + @TTL INT, + @RowVersion BINARY(8), + @FirstWrite BIT + ) AS + BEGIN + IF (@FirstWrite=1) + BEGIN + IF (@RowVersion IS NOT NULL) + BEGIN + BEGIN TRANSACTION; + IF NOT EXISTS (SELECT * FROM [%[3]s] WHERE [Key]=@Key AND RowVersion = @RowVersion) + BEGIN + THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1 + END + BEGIN + UPDATE [%[3]s] + SET [Data]=@Data, [isBinary]=@isBinary, [BinaryData]=@BinaryData, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END + WHERE [Key]=@Key AND RowVersion = @RowVersion + END + COMMIT; + END + ELSE + BEGIN + BEGIN TRANSACTION; + IF EXISTS (SELECT * FROM [%[3]s] WHERE [Key]=@Key) + BEGIN + THROW 2601, ''FIRST-WRITE: COMPETING RECORD ALREADY WRITTEN.'', 1 + END + BEGIN + BEGIN TRY + INSERT INTO [%[3]s] ([Key], [Data], [isBinary], [BinaryData], ExpireDate) VALUES (@Key, @Data, @isBinary, @BinaryData, CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END) + END TRY + + BEGIN CATCH + IF ERROR_NUMBER() IN (2601, 2627) + UPDATE [%[3]s] + SET [Data]=@Data, [isBinary]=@isBinary, [BinaryData]=@BinaryData, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END + WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) + END CATCH + END + COMMIT; + END + END + ELSE + BEGIN + IF (@RowVersion IS NOT NULL) + BEGIN + UPDATE [%[3]s] + SET [Data]=@Data, [isBinary]=@isBinary, [BinaryData]=@BinaryData, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END + WHERE [Key]=@Key AND RowVersion = @RowVersion + RETURN + END + ELSE + BEGIN + BEGIN TRY + INSERT INTO [%[3]s] ([Key], [Data], [isBinary], [BinaryData], ExpireDate) VALUES (@Key, @Data, @isBinary, @BinaryData, CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END) + END TRY + + BEGIN CATCH + IF ERROR_NUMBER() IN (2601, 2627) + UPDATE [%[3]s] + SET [Data]=@Data, [isBinary]=@isBinary, [BinaryData]=@BinaryData, UpdateDate=GETDATE(), ExpireDate=CASE WHEN @TTL IS NULL THEN NULL ELSE DATEADD(SECOND, @TTL, GETDATE()) END + WHERE [Key]=@Key AND RowVersion = ISNULL(@RowVersion, RowVersion) + END CATCH + END + END + END + `, + mr.upsertProcFullName, + mr.pkColumnType, + m.metadata.TableName, + ) + + return m.createStoredProcedureIfNotExists(ctx, db, mr.upsertProcName, tsql) +} diff --git a/state/sqlserver/v2/sqlserver.go b/state/sqlserver/v2/sqlserver.go new file mode 100644 index 0000000000..5736ac68df --- /dev/null +++ b/state/sqlserver/v2/sqlserver.go @@ -0,0 +1,420 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlserver + +import ( + "context" + "database/sql" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "reflect" + "time" + + commonsql "github.com/dapr/components-contrib/common/component/sql" + sqltransactions "github.com/dapr/components-contrib/common/component/sql/transactions" + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/utils" + "github.com/dapr/kit/logger" + "github.com/dapr/kit/ptr" +) + +// KeyType defines type of the table identifier. +type KeyType string + +// KeyTypeFromString tries to create a KeyType from a string value. +func KeyTypeFromString(k string) (KeyType, error) { + switch k { + case string(StringKeyType): + return StringKeyType, nil + case string(UUIDKeyType): + return UUIDKeyType, nil + case string(IntegerKeyType): + return IntegerKeyType, nil + } + + return InvalidKeyType, errors.New("invalid key type") +} + +const ( + // StringKeyType defines a key of type string. + StringKeyType KeyType = "string" + + // UUIDKeyType defines a key of type UUID/GUID. + UUIDKeyType KeyType = "uuid" + + // IntegerKeyType defines a key of type integer. + IntegerKeyType KeyType = "integer" + + // InvalidKeyType defines an invalid key type. + InvalidKeyType KeyType = "invalid" +) + +// New creates a new instance of a SQL Server transaction store. +func New(logger logger.Logger) state.Store { + s := &SQLServer{ + features: []state.Feature{ + state.FeatureETag, + state.FeatureTransactional, + state.FeatureTTL, + }, + logger: logger, + migratorFactory: newMigration, + } + s.BulkStore = state.NewDefaultBulkStore(s) + return s +} + +// IndexedProperty defines a indexed property. +type IndexedProperty struct { + ColumnName string `json:"column"` + Property string `json:"property"` + Type string `json:"type"` +} + +// SQLServer defines a MS SQL Server based state store. +type SQLServer struct { + state.BulkStore + + metadata sqlServerMetadata + + migratorFactory func(*sqlServerMetadata) migrator + + itemRefTableTypeName string + upsertCommand string + getCommand string + deleteWithETagCommand string + deleteWithoutETagCommand string + + features []state.Feature + logger logger.Logger + db *sql.DB + gc commonsql.GarbageCollector +} + +// Init initializes the SQL server state store. +func (s *SQLServer) Init(ctx context.Context, metadata state.Metadata) error { + s.metadata = newMetadata() + metadata.Base.GetProperty() + err := s.metadata.Parse(metadata.Properties) + if err != nil { + return err + } + + migration := s.migratorFactory(&s.metadata) + mr, err := migration.executeMigrations(ctx) + if err != nil { + return err + } + + s.itemRefTableTypeName = mr.itemRefTableTypeName + s.upsertCommand = mr.upsertProcFullName + s.getCommand = mr.getCommand + s.deleteWithETagCommand = mr.deleteWithETagCommand + s.deleteWithoutETagCommand = mr.deleteWithoutETagCommand + + conn, _, err := s.metadata.GetConnector(true) + if err != nil { + return err + } + s.db = sql.OpenDB(conn) + + if s.metadata.CleanupInterval != nil { + err = s.startGC() + if err != nil { + return err + } + } + + return nil +} + +func (s *SQLServer) startGC() error { + gc, err := commonsql.ScheduleGarbageCollector(commonsql.GCOptions{ + Logger: s.logger, + UpdateLastCleanupQuery: func(arg any) (string, any) { + return fmt.Sprintf(`BEGIN TRANSACTION; +BEGIN TRY +INSERT INTO [%[1]s].[%[2]s] ([Key], [Value]) VALUES ('last-cleanup', CONVERT(nvarchar(MAX), GETDATE(), 21)); +END TRY +BEGIN CATCH +UPDATE [%[1]s].[%[2]s] SET [Value] = CONVERT(nvarchar(MAX), GETDATE(), 21) WHERE [Key] = 'last-cleanup' AND Datediff_big(MS, [Value], GETUTCDATE()) > @Interval +END CATCH +COMMIT TRANSACTION;`, s.metadata.SchemaName, s.metadata.MetadataTableName), sql.Named("Interval", arg) + }, + DeleteExpiredValuesQuery: fmt.Sprintf( + `DELETE FROM [%s].[%s] WHERE [ExpireDate] IS NOT NULL AND [ExpireDate] < GETDATE()`, + s.metadata.SchemaName, s.metadata.TableName, + ), + CleanupInterval: *s.metadata.CleanupInterval, + DB: commonsql.AdaptDatabaseSQLConn(s.db), + }) + if err != nil { + return err + } + s.gc = gc + + return nil +} + +// Features returns the features available in this state store. +func (s *SQLServer) Features() []state.Feature { + return s.features +} + +// Multi performs batched updates on a SQL Server store. +func (s *SQLServer) Multi(ctx context.Context, request *state.TransactionalStateRequest) error { + if request == nil { + return nil + } + + // If there's only 1 operation, skip starting a transaction + switch len(request.Operations) { + case 0: + return nil + case 1: + return s.execMultiOperation(ctx, request.Operations[0], s.db) + default: + _, err := sqltransactions.ExecuteInTransaction(ctx, s.logger, s.db, func(ctx context.Context, tx *sql.Tx) (r struct{}, err error) { + for _, op := range request.Operations { + err = s.execMultiOperation(ctx, op, tx) + if err != nil { + return r, err + } + } + return r, nil + }) + return err + } +} + +func (s *SQLServer) execMultiOperation(ctx context.Context, op state.TransactionalStateOperation, db dbExecutor) error { + switch req := op.(type) { + case state.SetRequest: + return s.executeSet(ctx, db, &req) + case state.DeleteRequest: + return s.executeDelete(ctx, db, &req) + default: + return fmt.Errorf("unsupported operation: %s", op.Operation()) + } +} + +// Delete removes an entity from the store. +func (s *SQLServer) Delete(ctx context.Context, req *state.DeleteRequest) error { + return s.executeDelete(ctx, s.db, req) +} + +func (s *SQLServer) executeDelete(ctx context.Context, db dbExecutor, req *state.DeleteRequest) error { + var err error + var res sql.Result + if req.HasETag() { + var b []byte + b, err = hex.DecodeString(*req.ETag) + if err != nil { + return state.NewETagError(state.ETagInvalid, err) + } + + res, err = db.ExecContext(ctx, s.deleteWithETagCommand, sql.Named(keyColumnName, req.Key), sql.Named(rowVersionColumnName, b)) + } else { + res, err = db.ExecContext(ctx, s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key)) + } + + // err represents errors thrown by the stored procedure or the database itself + if err != nil { + return err + } + + // if the row with matching key (and ETag if specified) is not found, then the stored procedure returns 0 rows affected + rows, err := res.RowsAffected() + if err != nil { + return err + } + + // When an ETAG is specified, a row must have been deleted or else we return an ETag mismatch error + if rows != 1 && req.ETag != nil && *req.ETag != "" { + return state.NewETagError(state.ETagMismatch, nil) + } + + // successful deletion, or noop if no ETAG specified + return nil +} + +// Get returns an entity from store. +func (s *SQLServer) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { + rows, err := s.db.QueryContext(ctx, s.getCommand, sql.Named(keyColumnName, req.Key)) + if err != nil { + return nil, err + } + + if rows.Err() != nil { + return nil, rows.Err() + } + + defer rows.Close() + + if !rows.Next() { + return &state.GetResponse{}, nil + } + + var ( + data sql.NullString + binaryData []byte + isBinary bool + rowVersion []byte + expireDate sql.NullTime + ) + err = rows.Scan(&data, &binaryData, &isBinary, &rowVersion, &expireDate) + if err != nil { + return nil, err + } + + etag := hex.EncodeToString(rowVersion) + + var metadata map[string]string + if expireDate.Valid { + metadata = map[string]string{ + state.GetRespMetaKeyTTLExpireTime: expireDate.Time.UTC().Format(time.RFC3339), + } + } + + var bytes []byte + if isBinary { + bytes = binaryData + } else { + if !data.Valid { + return nil, errors.New("unexpected error: no item was found") + } + bytes = []byte(data.String) + } + + return &state.GetResponse{ + Data: bytes, + ETag: ptr.Of(etag), + Metadata: metadata, + }, nil +} + +// Set adds/updates an entity on store. +func (s *SQLServer) Set(ctx context.Context, req *state.SetRequest) error { + return s.executeSet(ctx, s.db, req) +} + +// dbExecutor implements a common functionality implemented by db or tx. +type dbExecutor interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) +} + +func (s *SQLServer) executeSet(ctx context.Context, db dbExecutor, req *state.SetRequest) error { + bytes, isBinary := req.Value.([]byte) + namedData := sql.Named("Data", nil) + namedBinaryData := sql.Named("BinaryData", nil) + if !isBinary { + bt, err := json.Marshal(req.Value) + if err != nil { + return err + } + namedData = sql.Named("Data", string(bt)) + } else { + namedBinaryData = sql.Named("BinaryData", bytes) + } + + etag := sql.Named(rowVersionColumnName, nil) + if req.HasETag() { + b, err := hex.DecodeString(*req.ETag) + if err != nil { + return state.NewETagError(state.ETagInvalid, err) + } + etag = sql.Named(rowVersionColumnName, b) + } + + ttl, ttlerr := utils.ParseTTL(req.Metadata) + if ttlerr != nil { + return fmt.Errorf("error parsing TTL: %w", ttlerr) + } + + var res sql.Result + var err error + if req.Options.Concurrency == state.FirstWrite { + res, err = db.ExecContext(ctx, s.upsertCommand, + sql.Named(keyColumnName, req.Key), + namedData, + etag, + namedBinaryData, + sql.Named("isBinary", isBinary), + sql.Named("FirstWrite", 1), + sql.Named("TTL", ttl)) + } else { + res, err = db.ExecContext(ctx, s.upsertCommand, + sql.Named(keyColumnName, req.Key), + namedData, + etag, + namedBinaryData, + sql.Named("isBinary", isBinary), + sql.Named("FirstWrite", 0), + sql.Named("TTL", ttl)) + } + + if err != nil { + return err + } + + rows, err := res.RowsAffected() + if err != nil { + return err + } + + if rows != 1 { + if req.HasETag() { + return state.NewETagError(state.ETagMismatch, err) + } + return errors.New("no item was updated") + } + + return nil +} + +func (s *SQLServer) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { + settingsStruct := sqlServerMetadata{} + metadata.GetMetadataInfoFromStructType(reflect.TypeOf(settingsStruct), &metadataInfo, metadata.StateStoreType) + return +} + +// Close implements io.Closer. +func (s *SQLServer) Close() error { + if s.db != nil { + s.db.Close() + s.db = nil + } + + if s.gc != nil { + return s.gc.Close() + } + + return nil +} + +// GetCleanupInterval returns the cleanupInterval property. +// This is primarily used for tests. +func (s *SQLServer) GetCleanupInterval() *time.Duration { + return s.metadata.CleanupInterval +} + +func (s *SQLServer) CleanupExpired() error { + if s.gc != nil { + return s.gc.CleanupExpired() + } + return nil +} diff --git a/state/sqlserver/v2/sqlserver_integration_test.go b/state/sqlserver/v2/sqlserver_integration_test.go new file mode 100644 index 0000000000..230d1a5bed --- /dev/null +++ b/state/sqlserver/v2/sqlserver_integration_test.go @@ -0,0 +1,661 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlserver + +import ( + "crypto/rand" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "math" + "os" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/dapr/components-contrib/common/proto/state/sqlserver" + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + // connectionStringEnvKey defines the key containing the integration test connection string + // To use docker, server=localhost;user id=sa;password=Pass@Word1;port=1433; + // To use Azure SQL, server=.database.windows.net;User id=;port=1433;password=;database=dapr_test;. + connectionStringEnvKey = "DAPR_TEST_SQL_CONNSTRING" + usersTableName = "Users" + beverageTea = "tea" + invalidEtag = "FFFFFFFFFFFFFFFF" +) + +type user struct { + ID string + Name string + FavoriteBeverage string +} + +type userWithPets struct { + user + PetsCount int +} + +type userWithEtag struct { + user + etag string +} + +func TestIntegrationCases(t *testing.T) { + connectionString := os.Getenv(connectionStringEnvKey) + if connectionString == "" { + t.Skipf(`SQLServer state integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="server=localhost;user id=sa;password=Pass@Word1;port=1433;")'`, connectionStringEnvKey) + } + + t.Run("Single operations", testSingleOperations) + t.Run("Set New Record With Invalid Etag Should Fail", testSetNewRecordWithInvalidEtagShouldFail) + t.Run("Indexed Properties", testIndexedProperties) + t.Run("Multi operations", testMultiOperations) + t.Run("Insert and Update Set Record Dates", testInsertAndUpdateSetRecordDates) + t.Run("Multiple initializations", testMultipleInitializations) + t.Run("Should preserve byte data when not base64 encoded", testNonBase64ByteData) + + // Run concurrent set tests 10 times + const executions = 10 + for i := range executions { + t.Run(fmt.Sprintf("Concurrent sets, try #%d", i+1), testConcurrentSets) + } +} + +func getUniqueDBSchema(t *testing.T) string { + b := make([]byte, 4) + _, err := io.ReadFull(rand.Reader, b) + require.NoError(t, err) + return "v" + hex.EncodeToString(b) +} + +func createMetadata(schema string, kt KeyType, indexedProperties string) state.Metadata { + metadata := state.Metadata{Base: metadata.Base{ + Properties: map[string]string{ + "connectionString": os.Getenv(connectionStringEnvKey), + "schema": schema, + "tableName": usersTableName, + "keyType": string(kt), + "databaseName": "dapr_test", + }, + }} + + if indexedProperties != "" { + metadata.Properties["indexedProperties"] = indexedProperties + } + + return metadata +} + +// Ensure the database is running +// For docker, use: docker run --name sqlserver -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=Pass@Word1" -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04. +// For azure-sql-edge use: +// docker volume create sqlvolume +// docker run --name sqlserver -e "ACCEPT_EULA=Y" -e "MSSQL_SA_PASSWORD=Pass@Word1" -e "MSSQL_PID=Developer" -e "MSSQL_AGENT_ENABLED=TRUE" -e "MSSQL_COLLATION=SQL_Latin1_General_CP1_CI_AS" -e "MSSQL_LCID=1033" -p 1433:1433 -v sqlvolume:/var/opt/mssql -d mcr.microsoft.com/azure-sql-edge:latest +func getTestStore(t *testing.T, indexedProperties string) *SQLServer { + return getTestStoreWithKeyType(t, StringKeyType, indexedProperties) +} + +func getTestStoreWithKeyType(t *testing.T, kt KeyType, indexedProperties string) *SQLServer { + schema := getUniqueDBSchema(t) + metadata := createMetadata(schema, kt, indexedProperties) + store := &SQLServer{ + logger: logger.NewLogger("test"), + migratorFactory: newMigration, + } + store.BulkStore = state.NewDefaultBulkStore(store) + err := store.Init(t.Context(), metadata) + require.NoError(t, err) + + return store +} + +func assertUserExists(t *testing.T, store *SQLServer, key string) (user, string) { + getRes, err := store.Get(t.Context(), &state.GetRequest{Key: key}) + require.NoError(t, err) + assert.NotNil(t, getRes) + assert.NotNil(t, getRes.Data, "No data was returned") + require.NotNil(t, getRes.ETag) + + var loaded user + err = json.Unmarshal(getRes.Data, &loaded) + require.NoError(t, err) + + return loaded, *getRes.ETag +} + +func assertLoadedUserIsEqual(t *testing.T, store *SQLServer, key string, expected user) (user, string) { + loaded, etag := assertUserExists(t, store, key) + assert.Equal(t, expected.ID, loaded.ID) + assert.Equal(t, expected.Name, loaded.Name) + assert.Equal(t, expected.FavoriteBeverage, loaded.FavoriteBeverage) + + return loaded, etag +} + +func assertUserDoesNotExist(t *testing.T, store *SQLServer, key string) { + _, err := store.Get(t.Context(), &state.GetRequest{Key: key}) + require.NoError(t, err) +} + +func assertDBQuery(t *testing.T, store *SQLServer, query string, assertReader func(t *testing.T, rows *sql.Rows)) { + rows, err := store.db.Query(query) + require.NoError(t, err) + require.NoError(t, rows.Err()) + + defer rows.Close() + assertReader(t, rows) +} + +/* #nosec. */ +func assertUserCountIsEqualTo(t *testing.T, store *SQLServer, expected int) { + tsql := fmt.Sprintf("SELECT count(*) FROM [%s].[%s]", store.metadata.SchemaName, store.metadata.TableName) + assertDBQuery(t, store, tsql, func(t *testing.T, rows *sql.Rows) { + assert.True(t, rows.Next()) + var actual int + err := rows.Scan(&actual) + require.NoError(t, err) + assert.Equal(t, expected, actual) + }) +} + +type userKeyGenerator interface { + NextKey() string +} + +type numbericKeyGenerator struct { + seed int32 +} + +func (n *numbericKeyGenerator) NextKey() string { + val := atomic.AddInt32(&n.seed, 1) + + return strconv.Itoa(int(val)) +} + +type uuidKeyGenerator struct{} + +func (n uuidKeyGenerator) NextKey() string { + return uuid.New().String() +} + +func testSingleOperations(t *testing.T) { + invEtag := invalidEtag + + tests := []struct { + name string + kt KeyType + keyGen userKeyGenerator + }{ + {"Single operation string key type", StringKeyType, &numbericKeyGenerator{}}, + {"Single operation integer key type", IntegerKeyType, &numbericKeyGenerator{}}, + {"Single operation uuid key type", UUIDKeyType, &uuidKeyGenerator{}}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := getTestStoreWithKeyType(t, test.kt, "") + + john := user{test.keyGen.NextKey(), "John", "Coffee"} + + // Get fails as the item does not exist + assertUserDoesNotExist(t, store, john.ID) + + // Save and read + err := store.Set(t.Context(), &state.SetRequest{Key: john.ID, Value: john}) + require.NoError(t, err) + johnV1, etagFromInsert := assertLoadedUserIsEqual(t, store, john.ID, john) + + // Update with ETAG + waterJohn := johnV1 + waterJohn.FavoriteBeverage = "Water" + err = store.Set(t.Context(), &state.SetRequest{Key: waterJohn.ID, Value: waterJohn, ETag: &etagFromInsert}) + require.NoError(t, err) + + // Get updated + johnV2, _ := assertLoadedUserIsEqual(t, store, waterJohn.ID, waterJohn) + + // Update without ETAG + noEtagJohn := johnV2 + noEtagJohn.FavoriteBeverage = "No Etag John" + err = store.Set(t.Context(), &state.SetRequest{Key: noEtagJohn.ID, Value: noEtagJohn}) + require.NoError(t, err) + + // 7. Get updated + johnV3, _ := assertLoadedUserIsEqual(t, store, noEtagJohn.ID, noEtagJohn) + + // 8. Update with invalid ETAG should fail + failedJohn := johnV3 + failedJohn.FavoriteBeverage = "Will not work" + err = store.Set(t.Context(), &state.SetRequest{Key: failedJohn.ID, Value: failedJohn, ETag: &etagFromInsert}) + require.Error(t, err) + _, etag := assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3) + + // 9. Delete with invalid ETAG should fail + err = store.Delete(t.Context(), &state.DeleteRequest{Key: johnV3.ID, ETag: &invEtag}) + require.Error(t, err) + assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3) + + // 10. Delete with valid ETAG + err = store.Delete(t.Context(), &state.DeleteRequest{Key: johnV2.ID, ETag: &etag}) + require.NoError(t, err) + + assertUserDoesNotExist(t, store, johnV2.ID) + }) + } +} + +func testSetNewRecordWithInvalidEtagShouldFail(t *testing.T) { + store := getTestStore(t, "") + + u := user{uuid.New().String(), "John", "Coffee"} + + invEtag := invalidEtag + err := store.Set(t.Context(), &state.SetRequest{Key: u.ID, Value: u, ETag: &invEtag}) + require.Error(t, err) +} + +/* #nosec. */ +func testIndexedProperties(t *testing.T) { + store := getTestStore(t, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}, { "column":"PetsCount", "property":"PetsCount", "type": "INTEGER"}]`) + + err := store.BulkSet(t.Context(), []state.SetRequest{ + {Key: "1", Value: userWithPets{user{"1", "John", "Coffee"}, 3}}, + {Key: "2", Value: userWithPets{user{"2", "Laura", "Water"}, 1}}, + {Key: "3", Value: userWithPets{user{"3", "Carl", "Beer"}, 0}}, + {Key: "4", Value: userWithPets{user{"4", "Maria", "Wine"}, 100}}, + }, state.BulkStoreOpts{}) + + require.NoError(t, err) + + // Check the database for computed columns + assertDBQuery(t, store, fmt.Sprintf("SELECT count(*) from [%s].[%s] WHERE PetsCount < 3", store.metadata.SchemaName, usersTableName), func(t *testing.T, rows *sql.Rows) { + assert.True(t, rows.Next()) + + var c int + rows.Scan(&c) + assert.Equal(t, 2, c) + }) + + // Ensure we can get by beverage + assertDBQuery(t, store, fmt.Sprintf("SELECT count(*) from [%s].[%s] WHERE FavoriteBeverage = '%s'", store.metadata.SchemaName, usersTableName, "Coffee"), func(t *testing.T, rows *sql.Rows) { + assert.True(t, rows.Next()) + + var c int + rows.Scan(&c) + assert.Equal(t, 1, c) + }) +} + +func testMultiOperations(t *testing.T) { + tests := []struct { + name string + kt KeyType + keyGen userKeyGenerator + }{ + {"Multi operations string key type", StringKeyType, &numbericKeyGenerator{}}, + {"Multi operations integer key type", IntegerKeyType, &numbericKeyGenerator{}}, + {"Multi operations uuid key type", UUIDKeyType, &uuidKeyGenerator{}}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := getTestStoreWithKeyType(t, test.kt, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}]`) + + keyGen := test.keyGen + + initialUsers := []user{ + {keyGen.NextKey(), "John", "Coffee"}, + {keyGen.NextKey(), "Laura", "Water"}, + {keyGen.NextKey(), "Carl", "Beer"}, + {keyGen.NextKey(), "Maria", "Wine"}, + {keyGen.NextKey(), "Mark", "Juice"}, + {keyGen.NextKey(), "Sara", "Soda"}, + {keyGen.NextKey(), "Tony", "Milk"}, + {keyGen.NextKey(), "Hugo", "Juice"}, + } + + // 1. add bulk users + bulkSet := make([]state.SetRequest, len(initialUsers)) + for i, u := range initialUsers { + bulkSet[i] = state.SetRequest{Key: u.ID, Value: u} + } + + err := store.BulkSet(t.Context(), bulkSet, state.BulkStoreOpts{}) + require.NoError(t, err) + assertUserCountIsEqualTo(t, store, len(initialUsers)) + + // Ensure initial users are correctly stored + loadedUsers := make([]userWithEtag, len(initialUsers)) + for i, u := range initialUsers { + loaded, etag := assertLoadedUserIsEqual(t, store, u.ID, u) + loadedWithEtag := userWithEtag{loaded, etag} + loadedUsers[i] = loadedWithEtag + } + totalUsers := len(loadedUsers) + + userIndex := 0 + t.Run("Update and delete without etag should work", func(t *testing.T) { + toDelete := loadedUsers[userIndex].user + original := loadedUsers[userIndex+1] + modified := original.user + modified.FavoriteBeverage = beverageTea + + localErr := store.Multi(t.Context(), &state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + state.DeleteRequest{Key: toDelete.ID}, + state.SetRequest{Key: modified.ID, Value: modified}, + }, + }) + require.NoError(t, localErr) + assertLoadedUserIsEqual(t, store, modified.ID, modified) + assertUserDoesNotExist(t, store, toDelete.ID) + + totalUsers-- + assertUserCountIsEqualTo(t, store, totalUsers) + + userIndex += 2 + }) + + t.Run("Update, delete and insert should work", func(t *testing.T) { + toDelete := loadedUsers[userIndex] + toModify := loadedUsers[userIndex+1] + toInsert := user{keyGen.NextKey(), "Susan", "Soda"} + modified := toModify.user + modified.FavoriteBeverage = beverageTea + + err = store.Multi(t.Context(), &state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag}, + state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag}, + state.SetRequest{Key: toInsert.ID, Value: toInsert}, + }, + }) + require.NoError(t, err) + assertLoadedUserIsEqual(t, store, modified.ID, modified) + assertLoadedUserIsEqual(t, store, toInsert.ID, toInsert) + assertUserDoesNotExist(t, store, toDelete.ID) + + // we added 1 and deleted 1, so totalUsers should have no change + assertUserCountIsEqualTo(t, store, totalUsers) + + userIndex += 2 + }) + + t.Run("Update and delete with etag should work", func(t *testing.T) { + toDelete := loadedUsers[userIndex] + toModify := loadedUsers[userIndex+1] + modified := toModify.user + modified.FavoriteBeverage = beverageTea + + err = store.Multi(t.Context(), &state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag}, + state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag}, + }, + }) + require.NoError(t, err) + assertLoadedUserIsEqual(t, store, modified.ID, modified) + assertUserDoesNotExist(t, store, toDelete.ID) + + totalUsers-- + assertUserCountIsEqualTo(t, store, totalUsers) + + userIndex += 2 + }) + + t.Run("Delete fails, should abort insert", func(t *testing.T) { + toDelete := loadedUsers[userIndex] + toInsert := user{keyGen.NextKey(), "Wont-be-inserted", "Beer"} + + invEtag := invalidEtag + err = store.Multi(t.Context(), &state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag}, + state.SetRequest{Key: toInsert.ID, Value: toInsert}, + }, + }) + + require.Error(t, err) + assertUserDoesNotExist(t, store, toInsert.ID) + assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user) + + assertUserCountIsEqualTo(t, store, totalUsers) + }) + + t.Run("Delete fails, should abort update", func(t *testing.T) { + toDelete := loadedUsers[userIndex] + toModify := loadedUsers[userIndex+1] + modified := toModify.user + modified.FavoriteBeverage = beverageTea + + invEtag := invalidEtag + err = store.Multi(t.Context(), &state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag}, + state.SetRequest{Key: modified.ID, Value: modified}, + }, + }) + require.Error(t, err) + assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user) + assertLoadedUserIsEqual(t, store, toModify.ID, toModify.user) + + assertUserCountIsEqualTo(t, store, totalUsers) + }) + + t.Run("Update fails, should abort delete", func(t *testing.T) { + toDelete := loadedUsers[userIndex] + toModify := loadedUsers[userIndex+1] + modified := toModify.user + modified.FavoriteBeverage = beverageTea + + invEtag := invalidEtag + err = store.Multi(t.Context(), &state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + state.DeleteRequest{Key: toDelete.ID}, + state.SetRequest{Key: modified.ID, Value: modified, ETag: &invEtag}, + }, + }) + + require.Error(t, err) + assertLoadedUserIsEqual(t, store, toDelete.ID, toDelete.user) + assertLoadedUserIsEqual(t, store, toModify.ID, toModify.user) + + assertUserCountIsEqualTo(t, store, totalUsers) + }) + }) + } +} + +/* #nosec. */ +func testInsertAndUpdateSetRecordDates(t *testing.T) { + const maxDiffInMs = float64(500) + store := getTestStore(t, "") + + u := user{"1", "John", "Coffee"} + err := store.Set(t.Context(), &state.SetRequest{Key: u.ID, Value: u}) + require.NoError(t, err) + + var originalInsertTime time.Time + getUserTsql := fmt.Sprintf("SELECT [InsertDate], [UpdateDate] from [%s].[%s] WHERE [Key]='%s'", store.metadata.SchemaName, store.metadata.TableName, u.ID) + assertDBQuery(t, store, getUserTsql, func(t *testing.T, rows *sql.Rows) { + assert.True(t, rows.Next()) + + var insertDate, updateDate sql.NullTime + localErr := rows.Scan(&insertDate, &updateDate) + require.NoError(t, localErr) + + assert.True(t, insertDate.Valid) + insertDiff := float64(time.Now().UTC().Sub(insertDate.Time).Milliseconds()) + assert.LessOrEqual(t, math.Abs(insertDiff), maxDiffInMs) + assert.False(t, updateDate.Valid) + + originalInsertTime = insertDate.Time + }) + + modified := u + modified.FavoriteBeverage = beverageTea + err = store.Set(t.Context(), &state.SetRequest{Key: modified.ID, Value: modified}) + require.NoError(t, err) + assertDBQuery(t, store, getUserTsql, func(t *testing.T, rows *sql.Rows) { + assert.True(t, rows.Next()) + + var insertDate, updateDate sql.NullTime + err := rows.Scan(&insertDate, &updateDate) + require.NoError(t, err) + + assert.True(t, insertDate.Valid) + assert.Equal(t, originalInsertTime, insertDate.Time) + + assert.True(t, updateDate.Valid) + updateDiff := float64(time.Now().UTC().Sub(updateDate.Time).Milliseconds()) + assert.LessOrEqual(t, math.Abs(updateDiff), maxDiffInMs) + }) +} + +func testConcurrentSets(t *testing.T) { + const parallelism = 10 + + store := getTestStore(t, "") + + u := user{"1", "John", "Coffee"} + err := store.Set(t.Context(), &state.SetRequest{Key: u.ID, Value: u}) + require.NoError(t, err) + + _, etag := assertLoadedUserIsEqual(t, store, u.ID, u) + + var wc sync.WaitGroup + start := make(chan bool, parallelism) + totalErrors := int32(0) + totalSucceeds := int32(0) + for range parallelism { + wc.Add(1) + go func(id, etag string, start <-chan bool, wc *sync.WaitGroup, store *SQLServer) { + <-start + + defer wc.Done() + + modified := user{"1", "John", beverageTea} + err := store.Set(t.Context(), &state.SetRequest{Key: id, Value: modified, ETag: &etag}) + if err != nil { + atomic.AddInt32(&totalErrors, 1) + } else { + atomic.AddInt32(&totalSucceeds, 1) + } + }(u.ID, etag, start, &wc, store) + } + + close(start) + wc.Wait() + + assert.Equal(t, int32(parallelism-1), totalErrors) + assert.Equal(t, int32(1), totalSucceeds) +} + +func testMultipleInitializations(t *testing.T) { + tests := []struct { + name string + kt KeyType + indexedProperties string + }{ + {"No indexed properties", StringKeyType, ""}, + {"With indexed properties", StringKeyType, `[{ "column":"FavoriteBeverage", "property":"FavoriteBeverage", "type":"nvarchar(100)"}, { "column":"PetsCount", "property":"PetsCount", "type": "INTEGER"}]`}, + {"No indexed properties uuid key type", UUIDKeyType, ""}, + {"No indexed properties integer key type", IntegerKeyType, ""}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := getTestStoreWithKeyType(t, test.kt, test.indexedProperties) + + store2 := &SQLServer{ + logger: logger.NewLogger("test"), + migratorFactory: newMigration, + } + store2.BulkStore = state.NewDefaultBulkStore(store2) + err := store2.Init(t.Context(), createMetadata(store.metadata.SchemaName, test.kt, test.indexedProperties)) + require.NoError(t, err) + }) + } +} + +func testNonBase64ByteData(t *testing.T) { + t.Run("Set And Get Proto", func(t *testing.T) { + store := getTestStore(t, "") + request := &sqlserver.TestEvent{ + EventId: -1, + } + requestBytes, err := proto.Marshal(request) + require.NoError(t, err) + require.NoError(t, store.Set(t.Context(), &state.SetRequest{Key: "1", Value: requestBytes})) + resp, err := store.Get(t.Context(), &state.GetRequest{Key: "1"}) + require.NoError(t, err) + + response := &sqlserver.TestEvent{} + err = proto.Unmarshal(resp.Data, response) + require.NoError(t, err) + + assert.EqualValues(t, request.GetEventId(), response.GetEventId()) + }) + + t.Run("Set And Get Json", func(t *testing.T) { + store := getTestStore(t, "") + request := &sqlserver.TestEvent{ + EventId: -1, + } + requestBytes, err := json.Marshal(request) + require.NoError(t, err) + require.NoError(t, store.Set(t.Context(), &state.SetRequest{Key: "1", Value: requestBytes})) + resp, err := store.Get(t.Context(), &state.GetRequest{Key: "1"}) + require.NoError(t, err) + + response := &sqlserver.TestEvent{} + err = json.Unmarshal(resp.Data, response) + require.NoError(t, err) + + assert.EqualValues(t, request.GetEventId(), response.GetEventId()) + }) + + t.Run("Set And Get Indexed Json", func(t *testing.T) { + store := getTestStore(t, `[{"column": "eventid", "property": "EventId", "type": "int"}]`) + request := &sqlserver.TestEvent{ + EventId: -1, + } + requestBytes, err := json.Marshal(request) + require.NoError(t, err) + require.NoError(t, store.Set(t.Context(), &state.SetRequest{Key: "1", Value: requestBytes})) + resp, err := store.Get(t.Context(), &state.GetRequest{Key: "1"}) + require.NoError(t, err) + + response := &sqlserver.TestEvent{} + err = json.Unmarshal(resp.Data, response) + require.NoError(t, err) + + assert.EqualValues(t, request.GetEventId(), response.GetEventId()) + }) +} diff --git a/state/sqlserver/v2/sqlserver_test.go b/state/sqlserver/v2/sqlserver_test.go new file mode 100644 index 0000000000..4f9c5117a5 --- /dev/null +++ b/state/sqlserver/v2/sqlserver_test.go @@ -0,0 +1,556 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlserver + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/common/authentication/sqlserver" + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + "github.com/dapr/kit/logger" +) + +const ( + sampleConnectionString = "server=localhost;user id=sa;password=Pass@Word1;port=1433;database=sample;" + sampleUserTableName = "Users" + defaultDatabase = "dapr" + defaultSchema = "dbo" +) + +type mockMigrator struct{} + +func (m *mockMigrator) executeMigrations(context.Context) (migrationResult, error) { + r := migrationResult{} + + return r, nil +} + +type mockFailingMigrator struct{} + +func (m *mockFailingMigrator) executeMigrations(context.Context) (migrationResult, error) { + r := migrationResult{} + + return r, errors.New("migration failed") +} + +func TestValidConfiguration(t *testing.T) { + tests := map[string]struct { + props map[string]string + expected SQLServer + }{ + "No schema": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Custom schema": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "schema": "mytest"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: "mytest", + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "String key type": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "keyType": "string"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Unique identifier key type": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "keyType": "uuid"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: UUIDKeyType, + keyLengthParsed: 0, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Integer identifier key type": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "keyType": "integer"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: IntegerKeyType, + keyLengthParsed: 0, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Custom key length": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "keyLength": "100"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: 100, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Single indexed property": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "indexedProperties": `[{"column": "Age","property":"age", "type":"int"}]`}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + indexedPropertiesParsed: []IndexedProperty{ + {ColumnName: "Age", Property: "age", Type: "int"}, + }, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Multiple indexed properties": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "indexedProperties": `[{"column": "Age","property":"age", "type":"int"}, {"column": "Name","property":"name", "type":"nvarchar(100)"}]`}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + indexedPropertiesParsed: []IndexedProperty{ + {ColumnName: "Age", Property: "age", Type: "int"}, + {ColumnName: "Name", Property: "name", Type: "nvarchar(100)"}, + }, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Custom database": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "databaseName": "dapr_test_table"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: "dapr_test_table", + SchemaName: defaultSchema, + }, + TableName: sampleUserTableName, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "No table": { + props: map[string]string{"connectionString": sampleConnectionString}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: defaultTable, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: defaultMetaTable, + }, + }, + }, + "Custom meta table": { + props: map[string]string{"connectionString": sampleConnectionString, "metadataTableName": "dapr_test_meta_table"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: defaultTable, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: "dapr_test_meta_table", + }, + }, + }, + "Actor state store true": { + props: map[string]string{"connectionString": sampleConnectionString, "metadataTableName": "dapr_test_meta_table", "actorStateStore": "true"}, + expected: SQLServer{ + metadata: sqlServerMetadata{ + SQLServerAuthMetadata: sqlserver.SQLServerAuthMetadata{ + ConnectionString: sampleConnectionString, + DatabaseName: defaultDatabase, + SchemaName: defaultSchema, + }, + TableName: defaultTable, + keyTypeParsed: StringKeyType, + keyLengthParsed: defaultKeyLength, + MetadataTableName: "dapr_test_meta_table", + }, + }, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + sqlStore := &SQLServer{ + logger: logger.NewLogger("test"), + migratorFactory: func(*sqlServerMetadata) migrator { + return &mockMigrator{} + }, + } + + metadata := state.Metadata{ + Base: metadata.Base{Properties: tt.props}, + } + + err := sqlStore.Init(t.Context(), metadata) + require.NoError(t, err) + assert.Equal(t, tt.expected.metadata.ConnectionString, sqlStore.metadata.ConnectionString) + assert.Equal(t, tt.expected.metadata.TableName, sqlStore.metadata.TableName) + assert.Equal(t, tt.expected.metadata.SchemaName, sqlStore.metadata.SchemaName) + assert.Equal(t, tt.expected.metadata.keyTypeParsed, sqlStore.metadata.keyTypeParsed) + assert.Equal(t, tt.expected.metadata.keyLengthParsed, sqlStore.metadata.keyLengthParsed) + assert.Equal(t, tt.expected.metadata.DatabaseName, sqlStore.metadata.DatabaseName) + assert.Equal(t, tt.expected.metadata.MetadataTableName, sqlStore.metadata.MetadataTableName) + + assert.Equal(t, len(tt.expected.metadata.indexedPropertiesParsed), len(sqlStore.metadata.indexedPropertiesParsed)) + if len(tt.expected.metadata.indexedPropertiesParsed) > 0 && len(tt.expected.metadata.indexedPropertiesParsed) == len(sqlStore.metadata.indexedPropertiesParsed) { + for i, e := range tt.expected.metadata.indexedPropertiesParsed { + assert.Equal(t, e.ColumnName, sqlStore.metadata.indexedPropertiesParsed[i].ColumnName) + assert.Equal(t, e.Property, sqlStore.metadata.indexedPropertiesParsed[i].Property) + assert.Equal(t, e.Type, sqlStore.metadata.indexedPropertiesParsed[i].Type) + } + } + }) + } +} + +func TestInvalidConfiguration(t *testing.T) { + tests := map[string]struct { + props map[string]string + expectedErr string + }{ + "Empty": { + props: map[string]string{}, + expectedErr: "missing connection string", + }, + "Empty connection string": { + props: map[string]string{"connectionString": ""}, + expectedErr: "missing connection string", + }, + "Negative maxKeyLength value": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "keyLength": "-1"}, + expectedErr: "invalid key length value of -1", + }, + "Indexes properties are not valid json": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": "no_json"}, + expectedErr: "invalid character", + }, + "Invalid table name with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test;"}, + expectedErr: "invalid table name", + }, + "Invalid table name with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test GO DROP DATABASE dapr_test"}, + expectedErr: "invalid table name", + }, + "Invalid metadata table name with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "metadataTableName": "test;"}, + expectedErr: "invalid metadata table name", + }, + "Invalid metadata table name with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "metadataTableName": "test GO DROP DATABASE dapr_test"}, + expectedErr: "invalid metadata table name", + }, + "Invalid schema name with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "schema": "test;"}, + expectedErr: "invalid schema name", + }, + "Invalid schema name with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "schema": "test GO DROP DATABASE dapr_test"}, + expectedErr: "invalid schema name", + }, + "Invalid index property column name with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"test;", "property": "age", "type": "INT"}]`}, + expectedErr: "invalid indexed property column name", + }, + "Invalid index property column name with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"test GO DROP DATABASE dapr_test", "property": "age", "type": "INT"}]`}, + expectedErr: "invalid indexed property column name", + }, + "Invalid index property name with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"age", "property": "test;", "type": "INT"}]`}, + expectedErr: "invalid indexed property name", + }, + "Invalid index property name with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"age", "property": "test GO DROP DATABASE dapr_test", "type": "INT"}]`}, + expectedErr: "invalid indexed property name", + }, + "Invalid index property type with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"age", "property": "age", "type": "INT;"}]`}, + expectedErr: "invalid indexed property type", + }, + "Invalid index property type with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"age", "property": "age", "type": "INT GO DROP DATABASE dapr_test"}]`}, + expectedErr: "invalid indexed property type", + }, + "Index property column cannot be empty": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"", "property": "age", "type": "INT"}]`}, + expectedErr: "indexed property column cannot be empty", + }, + "Invalid property name cannot be empty": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"age", "property": "", "type": "INT"}]`}, + expectedErr: "indexed property name cannot be empty", + }, + "Invalid property type cannot be empty": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "indexedProperties": `[{"column":"age", "property": "age", "type": ""}]`}, + expectedErr: "indexed property type cannot be empty", + }, + "Invalid database name with ;": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "databaseName": "test;"}, + expectedErr: "invalid database name", + }, + "Invalid database name with space": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "databaseName": "test GO DROP DATABASE dapr_test"}, + expectedErr: "invalid database name", + }, + "Invalid key type invalid": { + props: map[string]string{"connectionString": sampleConnectionString, "tableName": "test", "keyType": "invalid"}, + expectedErr: "invalid key type", + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + sqlStore := &SQLServer{ + logger: logger.NewLogger("test"), + } + + metadata := state.Metadata{ + Base: metadata.Base{Properties: tt.props}, + } + + err := sqlStore.Init(t.Context(), metadata) + require.Error(t, err) + + if tt.expectedErr != "" { + require.ErrorContains(t, err, tt.expectedErr) + } + }) + } +} + +func TestCleanupInterval(t *testing.T) { + t.Run("cleanupInterval not set", func(t *testing.T) { + properties := map[string]string{ + "url": "test", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + assert.Equal(t, "test", md.ConnectionString) + require.NotNil(t, md.CleanupInterval) + assert.Equal(t, defaultCleanupInterval, *md.CleanupInterval) + }) + + t.Run("cleanupInterval as Go duration", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupInterval": "1m", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + assert.Equal(t, "test", md.ConnectionString) + require.NotNil(t, md.CleanupInterval) + assert.Equal(t, time.Minute, *md.CleanupInterval) + }) + + t.Run("cleanupInterval as seconds", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupInterval": "10", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + assert.Equal(t, "test", md.ConnectionString) + require.NotNil(t, md.CleanupInterval) + assert.Equal(t, 10*time.Second, *md.CleanupInterval) + }) + + t.Run("cleanupIntervalInSeconds as Go duration", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupIntervalInSeconds": "1m", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + require.NotNil(t, md.CleanupInterval) + assert.Equal(t, time.Minute, *md.CleanupInterval) + }) + + t.Run("cleanupIntervalInSeconds as seconds", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupIntervalInSeconds": "10", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + require.NotNil(t, md.CleanupInterval) + assert.Equal(t, 10*time.Second, *md.CleanupInterval) + }) + + t.Run("cleanupInterval as 0", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupInterval": "0", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + require.Nil(t, md.CleanupInterval) + }) + + t.Run("cleanupIntervallInSeconds as 0", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupIntervalInSeconds": "0", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + require.Nil(t, md.CleanupInterval) + }) + + t.Run("cleanupInterval negative", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupInterval": "-1", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + require.Nil(t, md.CleanupInterval) + }) + + t.Run("cleanupIntervallInSeconds negative", func(t *testing.T) { + properties := map[string]string{ + "connectionString": "test", + "cleanupIntervalInSeconds": "-1", + } + + md := newMetadata() + err := md.Parse(properties) + require.NoError(t, err) + require.Nil(t, md.CleanupInterval) + }) +} + +// Test that if the migration fails the error is reported. +func TestExecuteMigrationFails(t *testing.T) { + sqlStore := &SQLServer{ + logger: logger.NewLogger("test"), + migratorFactory: func(*sqlServerMetadata) migrator { + return &mockFailingMigrator{} + }, + } + + metadata := state.Metadata{ + Base: metadata.Base{Properties: map[string]string{"connectionString": sampleConnectionString, "tableName": sampleUserTableName, "databaseName": "dapr_test_table"}}, + } + + err := sqlStore.Init(t.Context(), metadata) + require.Error(t, err) +} + +func TestSupportedFeatures(t *testing.T) { + sqlStore := &SQLServer{ + features: []state.Feature{state.FeatureETag, state.FeatureTransactional}, + logger: logger.NewLogger("test"), + } + + actual := sqlStore.Features() + assert.NotNil(t, actual) + assert.Equal(t, state.FeatureETag, actual[0]) + assert.Equal(t, state.FeatureTransactional, actual[1]) +} diff --git a/tests/certification/state/sqlserver/v2/README.md b/tests/certification/state/sqlserver/v2/README.md new file mode 100644 index 0000000000..88aba143b8 --- /dev/null +++ b/tests/certification/state/sqlserver/v2/README.md @@ -0,0 +1,46 @@ +# SQL Server certification testing + +This project aims to test the SQL Server State Store component under various conditions. + +## Test plan + +### SQL Injection + +* Not prone to SQL injection on write +* Not prone to SQL injection on read +* Not prone to SQL injection on delete + +### Indexed Properties + +* Verifies Indices are created for each indexed property in component metadata +* Verifies JSON data properties are parsed and written to dedicated database columns + +### Custom Properties + +* Verifies the use of custom tablename (default is states) +* Verifies the use of a custom schema (default is dbo) + +### Connection to different SQL Server types + +* Verifies connection handling with Azure SQL Server +* Verifies connection handling with SQL Server in Docker to represent self hosted SQL Server options + +## TTLs and cleanups + +1. Correctly parse the `cleanupIntervalInSeconds` metadata property: + - No value uses the default value (3600 seconds) + - A positive value sets the interval to the given number of seconds + - A zero or negative value disables the cleanup +2. The cleanup method deletes expired records and updates the metadata table with the last time it ran +3. The cleanup method doesn't run if the last iteration was less than `cleanupIntervalInSeconds` or if another process is doing the cleanup + +### Other tests + +* Client reconnects (if applicable) upon network interruption + + +### Running the tests + +This must be run in the GitHub Actions Workflow configured for test infrastructure setup. + +If you have access to an Azure subscription you can run this locally on Mac or Linux after running `setup-azure-conf-test.sh` in `.github/infrastructure/conformance/azure` and then sourcing the generated bash rc file. diff --git a/tests/certification/state/sqlserver/v2/components/azure/localsecrets.yaml b/tests/certification/state/sqlserver/v2/components/azure/localsecrets.yaml new file mode 100644 index 0000000000..94bb7a2643 --- /dev/null +++ b/tests/certification/state/sqlserver/v2/components/azure/localsecrets.yaml @@ -0,0 +1,9 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: envvar-secret-store + namespace: default +spec: + type: secretstores.local.env + version: v1 + metadata: diff --git a/tests/certification/state/sqlserver/v2/components/azure/sqlserver.yaml b/tests/certification/state/sqlserver/v2/components/azure/sqlserver.yaml new file mode 100644 index 0000000000..10b38f35df --- /dev/null +++ b/tests/certification/state/sqlserver/v2/components/azure/sqlserver.yaml @@ -0,0 +1,24 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: dapr-state-store +spec: + type: state.sqlserver + metadata: + - name: connectionString + secretKeyRef: + name: AzureSqlServerConnectionString + value: AzureSqlServerConnectionString + - name: databaseName + value: stablecertification_v2 + - name: tableName + value: dapr_certification_test + - name: keyType + value: string + - name: keyLength + value: 120 + - name: schema + value: proto + +auth: + secretStore: envvar-secret-store diff --git a/tests/certification/state/sqlserver/v2/components/docker/customschemawithindex/sqlserver.yaml b/tests/certification/state/sqlserver/v2/components/docker/customschemawithindex/sqlserver.yaml new file mode 100644 index 0000000000..69d51f0cf3 --- /dev/null +++ b/tests/certification/state/sqlserver/v2/components/docker/customschemawithindex/sqlserver.yaml @@ -0,0 +1,18 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: dapr-state-store +spec: + type: state.sqlserver + metadata: + - name: url + value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;Connection Timeout=30;" + - name: databaseName + value: certificationtest_v2 + - name: schemaName + value: customschema + - name: tableName + value: mystates + - name: indexedProperties + value: '[{"column": "transactionid", "property": "id", "type": "int"}, {"column": "customerid", "property": "customer", "type": "nvarchar(100)"}]' + diff --git a/tests/certification/state/sqlserver/v2/components/docker/default/sqlserver.yaml b/tests/certification/state/sqlserver/v2/components/docker/default/sqlserver.yaml new file mode 100644 index 0000000000..7f962a4667 --- /dev/null +++ b/tests/certification/state/sqlserver/v2/components/docker/default/sqlserver.yaml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: dapr-state-store +spec: + type: state.sqlserver + metadata: + - name: connectionString + value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;Connection Timeout=5;" + - name: databaseName + value: certificationtest_v2 diff --git a/tests/certification/state/sqlserver/v2/config.yaml b/tests/certification/state/sqlserver/v2/config.yaml new file mode 100644 index 0000000000..6c95e632ff --- /dev/null +++ b/tests/certification/state/sqlserver/v2/config.yaml @@ -0,0 +1,6 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: keyvaultconfig +spec: + features: diff --git a/tests/certification/state/sqlserver/v2/docker-compose.yml b/tests/certification/state/sqlserver/v2/docker-compose.yml new file mode 100644 index 0000000000..63510b7688 --- /dev/null +++ b/tests/certification/state/sqlserver/v2/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3.7" +services: + sqlserver: + image: mcr.microsoft.com/mssql/server:2019-latest + ports: + - "1433:1433" + environment: + ACCEPT_EULA: Y + SA_PASSWORD: "Pass@Word1" diff --git a/tests/certification/state/sqlserver/v2/sqlserver_test.go b/tests/certification/state/sqlserver/v2/sqlserver_test.go new file mode 100644 index 0000000000..2bfaa97701 --- /dev/null +++ b/tests/certification/state/sqlserver/v2/sqlserver_test.go @@ -0,0 +1,664 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlserver_test + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/contenttype" + // State. + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + state_sqlserver "github.com/dapr/components-contrib/state/sqlserver/v2" + state_loader "github.com/dapr/dapr/pkg/components/state" + "github.com/dapr/kit/logger" + + // Secret stores. + secretstore_env "github.com/dapr/components-contrib/secretstores/local/env" + secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores" + + // Dapr runtime and Go-SDK + + dapr_testing "github.com/dapr/dapr/pkg/testing" + "github.com/dapr/go-sdk/client" + + // Certification testing runnables + "github.com/dapr/components-contrib/tests/certification/embedded" + "github.com/dapr/components-contrib/tests/certification/flow" + "github.com/dapr/components-contrib/tests/certification/flow/dockercompose" + "github.com/dapr/components-contrib/tests/certification/flow/network" + "github.com/dapr/components-contrib/tests/certification/flow/retry" + "github.com/dapr/components-contrib/tests/certification/flow/sidecar" +) + +const ( + sidecarNamePrefix = "sqlserver-sidecar-" + dockerComposeYAML = "docker-compose.yml" + stateStoreName = "dapr-state-store" + certificationTestPrefix = "stable-certification-" + dockerConnectionString = "server=localhost;user id=sa;password=Pass@Word1;port=1433;" +) + +func TestSqlServer(t *testing.T) { + // The default certificate created by the docker container sometimes contains a negative serial number. + // A TLS certificate with a negative serial number is invalid, although it was tolerated until 1.22 + // Since Go 1.23 the default behavior has changed and the certificate is rejected. + // This environment variable is used to revert to the old behavior. + // Ref: https://github.com/microsoft/mssql-docker/issues/895 + oldDebugValue := os.Getenv("GODEBUG") + t.Setenv("GODEBUG", "x509negativeserial=1") + + if os.Getenv("GODEBUG") != "x509negativeserial=1" { + t.Fatal("Failed to set GODEBUG environment variable, actual value: " + os.Getenv("GODEBUG")) + } + defer func() { + t.Setenv("GODEBUG", oldDebugValue) + }() + + ports, err := dapr_testing.GetFreePorts(2) + require.NoError(t, err) + + currentGrpcPort := ports[0] + currentHTTPPort := ports[1] + + basicTest := func(ctx flow.Context) error { + ctx.T.Run("basic test", func(t *testing.T) { + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) + if err != nil { + panic(err) + } + defer client.Close() + + // save state, default options: strong, last-write + err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("certificationdata"), nil) + require.NoError(t, err) + + // get state + item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) + require.NoError(t, err) + assert.Equal(t, "certificationdata", string(item.Value)) + + // delete state + err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) + require.NoError(t, err) + }) + return nil + } + + basicTTLTest := func(ctx flow.Context) error { + ctx.T.Run("basic TTL test", func(t *testing.T) { + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) + if err != nil { + panic(err) + } + defer client.Close() + + err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key2", []byte("certificationdata"), map[string]string{ + "ttlInSeconds": "86400", + }) + require.NoError(t, err) + + // get state + item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key2", nil) + require.NoError(t, err) + assert.Equal(t, "certificationdata", string(item.Value)) + assert.Contains(t, item.Metadata, "ttlExpireTime") + expireTime, err := time.Parse(time.RFC3339, item.Metadata["ttlExpireTime"]) + require.NoError(t, err) + assert.InDelta(t, time.Now().Add(24*time.Hour).Unix(), expireTime.Unix(), 10) + + err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key2", []byte("certificationdata"), map[string]string{ + "ttlInSeconds": "1", + }) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"key2", nil) + require.NoError(t, err) + assert.Nil(t, item.Value) + assert.Nil(t, item.Metadata) + }) + + return nil + } + + // this test function heavily depends on the values defined in ./components/docker/customschemawithindex + verifyIndexedPopertiesTest := func(ctx flow.Context) error { + // verify indices were created by Dapr as specified in the component metadata + db, err := sql.Open("mssql", fmt.Sprintf("%sdatabase=certificationtest_v2;", dockerConnectionString)) + require.NoError(ctx.T, err) + defer db.Close() + + rows, err := db.Query("sp_helpindex '[customschema].[mystates]'") + require.NoError(ctx.T, err) + assert.NoError(ctx.T, rows.Err()) + defer rows.Close() + + indexFoundCount := 0 + for rows.Next() { + var indexedField, otherdata1, otherdata2 string + err = rows.Scan(&indexedField, &otherdata1, &otherdata2) + assert.NoError(ctx.T, err) + + expectedIndices := []string{"IX_customerid", "IX_transactionid", "PK_mystates"} + for _, item := range expectedIndices { + if item == indexedField { + indexFoundCount++ + break + } + } + } + assert.Equal(ctx.T, 3, indexFoundCount) + + // write JSON data to the state store (which will automatically be indexed in separate columns) + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) + if err != nil { + panic(err) + } + defer client.Close() + + order := struct { + ID int `json:"id"` + Customer string `json:"customer"` + Description string `json:"description"` + }{123456, "John Doe", "something"} + + data, err := json.Marshal(order) + assert.NoError(ctx.T, err) + + // save state with the key certificationkey1, default options: strong, last-write + err = client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", data, map[string]string{metadata.ContentType: contenttype.JSONContentType}) + require.NoError(ctx.T, err) + require.NoError(ctx.T, err) + + // get state for key certificationkey1 + item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) + assert.NoError(ctx.T, err) + assert.JSONEq(ctx.T, string(data), string(item.Value)) + + // check that Dapr wrote the indexed properties to separate columns + rows, err = db.Query("SELECT TOP 1 transactionid, customerid FROM [customschema].[mystates];") + assert.NoError(ctx.T, err) + assert.NoError(ctx.T, rows.Err()) + defer rows.Close() + if rows.Next() { + var transactionID int + var customerID string + err = rows.Scan(&transactionID, &customerID) + assert.NoError(ctx.T, err) + assert.Equal(ctx.T, transactionID, order.ID) + assert.Equal(ctx.T, customerID, order.Customer) + } else { + assert.Fail(ctx.T, "no rows returned") + } + + // delete state for key certificationkey1 + err = client.DeleteState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) + assert.NoError(ctx.T, err) + + return nil + } + + // helper function for testing the use of an existing custom schema + createCustomSchema := func(ctx flow.Context) error { + db, err := sql.Open("mssql", dockerConnectionString) + assert.NoError(ctx.T, err) + _, err = db.Exec("CREATE SCHEMA customschema;") + assert.NoError(ctx.T, err) + db.Close() + return nil + } + + // helper function to insure the SQL Server Docker Container is truly ready + checkSQLServerAvailability := func(ctx flow.Context) error { + db, err := sql.Open("mssql", dockerConnectionString) + if err != nil { + return err + } + _, err = db.Exec("SELECT * FROM INFORMATION_SCHEMA.TABLES;") + if err != nil { + return err + } + return nil + } + + // checks the state store component is not vulnerable to SQL injection + verifySQLInjectionTest := func(ctx flow.Context) error { + client, err := client.NewClientWithPort(strconv.Itoa(currentGrpcPort)) + if err != nil { + panic(err) + } + defer client.Close() + + // common SQL injection techniques for SQL Server + sqlInjectionAttempts := []string{ + "; DROP states--", + "dapr' OR '1'='1", + } + + for _, sqlInjectionAttempt := range sqlInjectionAttempts { + // save state with sqlInjectionAttempt's value as key, default options: strong, last-write + err = client.SaveState(ctx, stateStoreName, sqlInjectionAttempt, []byte(sqlInjectionAttempt), nil) + assert.NoError(ctx.T, err) + + // get state for key sqlInjectionAttempt's value + item, err := client.GetState(ctx, stateStoreName, sqlInjectionAttempt, nil) + assert.NoError(ctx.T, err) + assert.Equal(ctx.T, sqlInjectionAttempt, string(item.Value)) + + // delete state for key sqlInjectionAttempt's value + err = client.DeleteState(ctx, stateStoreName, sqlInjectionAttempt, nil) + assert.NoError(ctx.T, err) + } + + return nil + } + + // Validates TTLs and garbage collections + ttlTest := func(connString string) func(ctx flow.Context) error { + return func(ctx flow.Context) error { + log := logger.NewLogger("dapr.components") + + ctx.T.Run("parse cleanupIntervalInSeconds", func(t *testing.T) { + t.Run("default value", func(t *testing.T) { + // Default value is 1 hr + md := state.Metadata{ + Base: metadata.Base{ + Name: "ttltest", + Properties: map[string]string{ + "connectionString": connString, + "databaseName": "certificationtest", + "tableName": "ttltest", + "metadataTableName": "ttltest_metadata", + "schema": "ttlschema", + }, + }, + } + storeObj := state_sqlserver.New(log).(*state_sqlserver.SQLServer) + + err := storeObj.Init(t.Context(), md) + require.NoError(t, err, "failed to init") + defer storeObj.Close() + + cleanupInterval := storeObj.GetCleanupInterval() + require.NotNil(t, cleanupInterval) + assert.Equal(t, time.Duration(1*time.Hour), *cleanupInterval) + }) + + t.Run("positive value", func(t *testing.T) { + // A positive value is interpreted in seconds + md := state.Metadata{ + Base: metadata.Base{ + Name: "ttltest", + Properties: map[string]string{ + "connectionString": connString, + "databaseName": "certificationtest", + "tableName": "ttltest", + "metadataTableName": "ttltest_metadata", + "schema": "ttlschema", + "cleanupInterval": "10", + }, + }, + } + storeObj := state_sqlserver.New(log).(*state_sqlserver.SQLServer) + + err := storeObj.Init(t.Context(), md) + require.NoError(t, err, "failed to init") + defer storeObj.Close() + + cleanupInterval := storeObj.GetCleanupInterval() + require.NotNil(t, cleanupInterval) + assert.Equal(t, time.Duration(10*time.Second), *cleanupInterval) + }) + + t.Run("disabled", func(t *testing.T) { + // A value of <=0 means that the cleanup is disabled + md := state.Metadata{ + Base: metadata.Base{ + Name: "ttltest", + Properties: map[string]string{ + "connectionString": connString, + "databaseName": "certificationtest", + "tableName": "ttltest", + "metadataTableName": "ttltest_metadata", + "schema": "ttlschema", + "cleanupIntervalInSeconds": "0", + }, + }, + } + storeObj := state_sqlserver.New(log).(*state_sqlserver.SQLServer) + + err := storeObj.Init(t.Context(), md) + require.NoError(t, err, "failed to init") + defer storeObj.Close() + + cleanupInterval := storeObj.GetCleanupInterval() + assert.Nil(t, cleanupInterval) + }) + }) + + ctx.T.Run("cleanup", func(t *testing.T) { + dbClient, err := sql.Open("mssql", connString) + require.NoError(t, err) + + t.Run("automatically delete expiredate records", func(t *testing.T) { + // Run every second + md := state.Metadata{ + Base: metadata.Base{ + Name: "ttltest", + Properties: map[string]string{ + "connectionString": connString, + "databaseName": "certificationtest", + "tableName": "ttltest", + "metadataTableName": "ttltest_metadata", + "schema": "ttlschema", + "cleanupInterval": "1", + }, + }, + } + + storeObj := state_sqlserver.New(log).(*state_sqlserver.SQLServer) + err := storeObj.Init(t.Context(), md) + require.NoError(t, err, "failed to init") + defer storeObj.Close() + + // Seed the database with some records + err = clearTable(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to clear table") + err = populateTTLRecords(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to seed records") + + cleanupInterval := storeObj.GetCleanupInterval() + require.NotNil(t, cleanupInterval) + assert.Equal(t, time.Duration(time.Second), *cleanupInterval) + + // Wait up to 3 seconds then verify we have only 10 rows left + var count int + assert.Eventually(t, func() bool { + count, err = countRowsInTable(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to run query to count rows") + return count == 10 + }, 3*time.Second, 10*time.Millisecond, "expected 10 rows, got %d", count) + + // The "last-cleanup" value should be <= 1 second (+ a bit of buffer) + lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttlschema", "ttltest_metadata") + require.NoError(t, err, "failed to load value for 'last-cleanup'") + assert.LessOrEqual(t, lastCleanup, int64(1200)) + + // Wait 6 more seconds and verify there are no more rows left + assert.Eventually(t, func() bool { + count, err = countRowsInTable(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to run query to count rows") + return count == 0 + }, 6*time.Second, 10*time.Millisecond, "expected 0 rows, got %d", count) + + // The "last-cleanup" value should be <= 1 second (+ a bit of buffer) + lastCleanup, err = loadLastCleanupInterval(ctx, dbClient, "ttlschema", "ttltest_metadata") + require.NoError(t, err, "failed to load value for 'last-cleanup'") + assert.LessOrEqual(t, lastCleanup, int64(1200)) + }) + + t.Run("cleanup concurrency", func(t *testing.T) { + // Set to run every hour + // (we'll manually trigger more frequent iterations) + md := state.Metadata{ + Base: metadata.Base{ + Name: "ttltest", + Properties: map[string]string{ + "connectionString": connString, + "databaseName": "certificationtest", + "tableName": "ttltest", + "metadataTableName": "ttltest_metadata", + "schema": "ttlschema", + "cleanupInterval": "1h", + }, + }, + } + + storeObj := state_sqlserver.New(log).(*state_sqlserver.SQLServer) + err := storeObj.Init(t.Context(), md) + require.NoError(t, err, "failed to init") + defer storeObj.Close() + + cleanupInterval := storeObj.GetCleanupInterval() + assert.NotNil(t, cleanupInterval) + assert.Equal(t, time.Hour, *cleanupInterval) + + // Seed the database with some records + err = clearTable(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to clear table") + err = populateTTLRecords(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to seed records") + + // Validate that 20 records are present + count, err := countRowsInTable(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to run query to count rows") + assert.Equal(t, 20, count) + + // Set last-cleanup to 1s ago + _, err = dbClient.ExecContext(ctx, `UPDATE [ttlschema].[ttltest_metadata] SET [Value] = CONVERT(nvarchar(MAX), DATEADD(second, -1, GETDATE()), 21) WHERE [Key] = 'last-cleanup'`) + require.NoError(t, err, "failed to set last-cleanup") + + // The "last-cleanup" value + lastCleanup, err := loadLastCleanupInterval(ctx, dbClient, "ttlschema", "ttltest_metadata") + require.NoError(t, err, "failed to load value for 'last-cleanup'") + assert.LessOrEqual(t, lastCleanup, int64(1200)) + lastCleanupValueOrig, err := getValueFromMetadataTable(ctx, dbClient, "ttlschema", "ttltest_metadata", "'last-cleanup'") + require.NoError(t, err, "failed to load absolute value for 'last-cleanup'") + require.NotEmpty(t, lastCleanupValueOrig) + + // Trigger the background cleanup, which should do nothing because the last cleanup was < 3600s + require.NoError(t, storeObj.CleanupExpired(), "CleanupExpired returned an error") + + // Validate that 20 records are still present + count, err = countRowsInTable(ctx, dbClient, "ttlschema", "ttltest") + require.NoError(t, err, "failed to run query to count rows") + assert.Equal(t, 20, count) + + // The "last-cleanup" value should not have been changed + lastCleanupValue, err := getValueFromMetadataTable(ctx, dbClient, "ttlschema", "ttltest_metadata", "'last-cleanup'") + require.NoError(t, err, "failed to load absolute value for 'last-cleanup'") + assert.Equal(t, lastCleanupValueOrig, lastCleanupValue) + }) + }) + + return nil + } + } + + t.Run("SQLServer certification using SQL Server Docker", func(t *testing.T) { + flow.New(t, "SQLServer certification using SQL Server Docker"). + // Run SQL Server using Docker Compose. + Step(dockercompose.Run("sqlserver", dockerComposeYAML)). + Step("wait for SQL Server readiness", retry.Do(time.Second*3, 10, checkSQLServerAvailability)). + + // Run the Dapr sidecar with the SQL Server component. + Step(sidecar.Run(sidecarNamePrefix+"dockerDefault", + append(componentRuntimeOptions(), + embedded.WithoutApp(), + embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)), + embedded.WithResourcesPath("components/docker/default"), + embedded.WithProfilingEnabled(false), + )..., + )). + Step("Run basic test", basicTest). + Step("Run basic TTL test", basicTTLTest). + // Introduce network interruption of 10 seconds + // Note: the connection timeout is set to 5 seconds via the component metadata connection string. + Step("interrupt network", + network.InterruptNetwork(10*time.Second, nil, nil, "1433", "1434")). + + // Component should recover at this point. + Step("wait", flow.Sleep(5*time.Second)). + Step("Run basic test again to verify reconnection occurred", basicTest). + Step("Run SQL injection test", verifySQLInjectionTest, sidecar.Stop(sidecarNamePrefix+"dockerDefault")). + Step("run TTL test", ttlTest(dockerConnectionString+"database=certificationtest_v2;")). + Step("Stopping SQL Server Docker container", dockercompose.Stop("sqlserver", dockerComposeYAML)). + Run() + }) + + ports, err = dapr_testing.GetFreePorts(2) + require.NoError(t, err) + + currentGrpcPort = ports[0] + currentHTTPPort = ports[1] + + t.Run("Using existing custom schema with indexed data", func(t *testing.T) { + flow.New(t, "Using existing custom schema with indexed data"). + // Run SQL Server using Docker Compose. + Step(dockercompose.Run("sqlserver", dockerComposeYAML)). + Step("wait for SQL Server readiness", retry.Do(time.Second*3, 10, checkSQLServerAvailability)). + Step("Creating schema", createCustomSchema). + + // Run the Dapr sidecar with the SQL Server component. + Step(sidecar.Run(sidecarNamePrefix+"dockerCustomSchema", + append(componentRuntimeOptions(), + embedded.WithoutApp(), + embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)), + embedded.WithResourcesPath("components/docker/customschemawithindex"), + embedded.WithProfilingEnabled(false), + )..., + )). + Step("Run indexed properties verification test", verifyIndexedPopertiesTest, sidecar.Stop(sidecarNamePrefix+"dockerCustomSchema")). + Step("Stopping SQL Server Docker container", dockercompose.Stop("sqlserver", dockerComposeYAML)). + Run() + }) + + ports, err = dapr_testing.GetFreePorts(2) + require.NoError(t, err) + + currentGrpcPort = ports[0] + currentHTTPPort = ports[1] + + t.Run("SQL Server certification using Azure SQL", func(t *testing.T) { + flow.New(t, "SQL Server certification using Azure SQL"). + // Run the Dapr sidecar with the SQL Server component. + Step(sidecar.Run(sidecarNamePrefix+"azure", + append(componentRuntimeOptions(), + embedded.WithoutApp(), + embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)), + embedded.WithResourcesPath("components/azure"), + embedded.WithProfilingEnabled(false), + )..., + )). + Step("Run basic test", basicTest). + Step("Run basic TTL test", basicTTLTest). + Step("interrupt network", + network.InterruptNetwork(15*time.Second, nil, nil, "1433", "1434")). + + // Component should recover at this point. + Step("wait", flow.Sleep(10*time.Second)). + Step("Run basic test again to verify reconnection occurred", basicTest). + Step("Run SQL injection test", verifySQLInjectionTest, sidecar.Stop(sidecarNamePrefix+"azure")). + Step("run TTL test", ttlTest(os.Getenv("AzureSqlServerConnectionString")+"database=stablecertification_v2;")). + Run() + }) +} + +func componentRuntimeOptions() []embedded.Option { + log := logger.NewLogger("dapr.components") + + stateRegistry := state_loader.NewRegistry() + stateRegistry.Logger = log + stateRegistry.RegisterComponent(state_sqlserver.New, "sqlserver") + + secretstoreRegistry := secretstores_loader.NewRegistry() + secretstoreRegistry.Logger = log + secretstoreRegistry.RegisterComponent(secretstore_env.NewEnvSecretStore, "local.env") + + return []embedded.Option{ + embedded.WithStates(stateRegistry), + embedded.WithSecretStores(secretstoreRegistry), + } +} + +func countRowsInTable(ctx context.Context, dbClient *sql.DB, schema, table string) (count int, err error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + err = dbClient.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM [%s].[%s]", schema, table)).Scan(&count) + return +} + +func clearTable(ctx context.Context, dbClient *sql.DB, schema, table string) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + _, err := dbClient.ExecContext(ctx, fmt.Sprintf("DELETE FROM [%s].[%s]", schema, table)) + return err +} + +func loadLastCleanupInterval(ctx context.Context, dbClient *sql.DB, schema, table string) (lastCleanup int64, err error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + err = dbClient. + QueryRowContext(ctx, + fmt.Sprintf("SELECT DATEDIFF(MILLISECOND, CAST([Value] AS DATETIME2), GETDATE()) FROM [%s].[%s] WHERE [Key] = 'last-cleanup'", schema, table), + ). + Scan(&lastCleanup) + return +} + +func populateTTLRecords(ctx context.Context, dbClient *sql.DB, schema, table string) error { + // Insert 10 records that have expired, and 10 that will expire in 4 + // seconds. + rows := make([][]any, 20) + for i := 0; i < 10; i++ { + rows[i] = []any{ + fmt.Sprintf("'expired_%d'", i), + json.RawMessage(fmt.Sprintf("'value_%d'", i)), + "DATEADD(MINUTE, -1, GETDATE())", + } + } + for i := 0; i < 10; i++ { + rows[i+10] = []any{ + fmt.Sprintf("'notexpired_%d'", i), + json.RawMessage(fmt.Sprintf(`'value_%d'`, i)), + "DATEADD(SECOND, 4, GETDATE())", + } + } + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + for _, row := range rows { + _, err := dbClient.ExecContext(ctx, fmt.Sprintf( + "INSERT INTO [%[1]s].[%[2]s] ([Key], [Data], [ExpireDate]) VALUES (%[3]s, %[4]s, %[5]s)", + schema, table, row[0], row[1], row[2]), + ) + if err != nil { + return err + } + } + return nil +} + +func getValueFromMetadataTable(ctx context.Context, dbClient *sql.DB, schema, table, key string) (value string, err error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + err = dbClient. + QueryRowContext(ctx, fmt.Sprintf("SELECT [Value] FROM [%[1]s].[%[2]s] WHERE [Key] = %[3]s", schema, table, key)). + Scan(&value) + return +} diff --git a/tests/config/state/sqlserver/docker/statestore.yml b/tests/config/state/sqlserver/docker/statestore.yml new file mode 100644 index 0000000000..98d1403d33 --- /dev/null +++ b/tests/config/state/sqlserver/docker/statestore.yml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.sqlserver + metadata: + - name: connectionString + value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;" + - name: tableName + value: mytable diff --git a/tests/config/state/sqlserver/v2/docker/statestore.yml b/tests/config/state/sqlserver/v2/docker/statestore.yml new file mode 100644 index 0000000000..2dfa4b8451 --- /dev/null +++ b/tests/config/state/sqlserver/v2/docker/statestore.yml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.sqlserver + metadata: + - name: connectionString + value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;" + - name: tableName + value: mytable_v2 diff --git a/tests/config/state/sqlserver/v2/statestore.yml b/tests/config/state/sqlserver/v2/statestore.yml new file mode 100644 index 0000000000..2dfa4b8451 --- /dev/null +++ b/tests/config/state/sqlserver/v2/statestore.yml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.sqlserver + metadata: + - name: connectionString + value: "server=localhost;user id=sa;password=Pass@Word1;port=1433;" + - name: tableName + value: mytable_v2 diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index ed0f60de09..d176a0397e 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -4,18 +4,18 @@ componentType: state components: - component: redis.v6 - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "actorStateStore" ] config: # This component requires etags to be numeric badEtag: "9999999" - component: redis.v7 # "query" is not included because redisjson hasn't been updated to Redis v7 yet - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] config: # This component requires etags to be numeric badEtag: "9999999" - component: mongodb - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "actorStateStore" ] - component: memcached operations: [ "ttl" ] - component: azure.cosmosdb @@ -36,32 +36,47 @@ components: config: # This component requires etags to be hex-encoded numbers badEtag: "FFFF" + - component: sqlserver.v2 + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] + config: + # This component requires etags to be hex-encoded numbers + badEtag: "FFFF" + - component: sqlserver.docker + operations: [ "transaction", "etag", "first-write", "ttl" ] + config: + # This component requires etags to be hex-encoded numbers + badEtag: "FFFF" + - component: sqlserver.v2.docker + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] + config: + # This component requires etags to be hex-encoded numbers + badEtag: "FFFF" - component: postgresql.v1.docker - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "actorStateStore" ] config: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v1.azure - operations: [ "transaction", "etag", "first-write", "query", "ttl" ] + operations: [ "transaction", "etag", "first-write", "query", "ttl", "actorStateStore" ] config: # This component requires etags to be numeric badEtag: "1" - component: postgresql.v2.docker - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" - component: postgresql.v2.azure - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] config: # This component requires etags to be UUIDs badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70" - component: sqlite - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] - component: mysql.mysql - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] - component: mysql.mariadb - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] - component: azure.tablestorage.storage operations: [ "etag", "first-write"] config: @@ -73,7 +88,7 @@ components: # This component requires etags to be in this format badEtag: "W/\"datetime'2023-05-09T12%3A28%3A54.1442151Z'\"" - component: oracledatabase - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] - component: cassandra operations: [ "ttl" ] - component: cloudflare.workerskv @@ -92,16 +107,16 @@ components: - component: rethinkdb operations: [] - component: in-memory - operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix" ] + operations: [ "transaction", "etag", "first-write", "ttl", "delete-with-prefix", "actorStateStore" ] - component: aws.dynamodb.docker # In the Docker variant, we do not set ttlAttributeName in the metadata, so TTLs are not enabled operations: [ "transaction", "etag", "first-write" ] - component: aws.dynamodb.terraform operations: [ "transaction", "etag", "first-write", "ttl" ] - component: etcd.v1 - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] - component: etcd.v2 - operations: [ "transaction", "etag", "first-write", "ttl" ] + operations: [ "transaction", "etag", "first-write", "ttl", "actorStateStore" ] - component: gcp.firestore.docker operations: [] - component: gcp.firestore.cloud diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index 00b6074928..200762207f 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -27,7 +27,9 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "github.com/dapr/components-contrib/common/proto/state/sqlserver" "github.com/dapr/components-contrib/contenttype" "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/state" @@ -452,6 +454,36 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St } }) + t.Run("set and get proto", func(t *testing.T) { + if !config.HasOperation("actorStateStore") { + t.Skipf("skipping test for %s", config.ComponentName) + } + + protoBytes, err := proto.Marshal(&sqlserver.TestEvent{ + EventId: -1, + }) + require.NoError(t, err) + + err = statestore.Set(t.Context(), &state.SetRequest{ + Key: key + "-proto", + Value: protoBytes, + }) + require.NoError(t, err) + + // Request immediately + res, err := statestore.Get(t.Context(), &state.GetRequest{ + Key: key + "-proto", + }) + require.NoError(t, err) + assertEquals(t, protoBytes, res) + + response := &sqlserver.TestEvent{} + err = proto.Unmarshal(res.Data, response) + require.NoError(t, err) + + assert.EqualValues(t, -1, response.GetEventId()) + }) + if config.HasOperation("query") { t.Run("query", func(t *testing.T) { // Check if query feature is listed @@ -1340,6 +1372,11 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St } t.Run("set and get expire time", func(t *testing.T) { + if config.ComponentName == "sqlserver" || + config.ComponentName == "sqlserver.docker" { + t.Skip() + } + now := time.Now() err := statestore.Set(t.Context(), &state.SetRequest{ Key: key + "-ttl-expire-time", diff --git a/tests/conformance/state_test.go b/tests/conformance/state_test.go index e102a68b27..e17f3664ec 100644 --- a/tests/conformance/state_test.go +++ b/tests/conformance/state_test.go @@ -45,6 +45,7 @@ import ( s_rethinkdb "github.com/dapr/components-contrib/state/rethinkdb" s_sqlite "github.com/dapr/components-contrib/state/sqlite" s_sqlserver "github.com/dapr/components-contrib/state/sqlserver" + s_sqlserver_v2 "github.com/dapr/components-contrib/state/sqlserver/v2" conf_state "github.com/dapr/components-contrib/tests/conformance/state" ) @@ -93,6 +94,12 @@ func loadStateStore(name string) state.Store { return s_sqlserver.New(testLogger) case "sqlserver": return s_sqlserver.New(testLogger) + case "sqlserver.v2": + return s_sqlserver_v2.New(testLogger) + case "sqlserver.docker": + return s_sqlserver.New(testLogger) + case "sqlserver.v2.docker": + return s_sqlserver_v2.New(testLogger) case "postgresql.v1.docker": return s_postgresql_v1.NewPostgreSQLStateStore(testLogger) case "postgresql.v1.azure":