diff --git a/state/feature.go b/state/feature.go index 9a53370076..6cb38ec8ae 100644 --- a/state/feature.go +++ b/state/feature.go @@ -26,6 +26,8 @@ const ( FeatureQueryAPI Feature = "QUERY_API" // FeatureTTL is the feature that supports TTLs. FeatureTTL Feature = "TTL" + // FeatureDeleteWithPrefix is the feature that supports deleting with prefix. + FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX" ) // Feature names a feature that can be implemented by state store components. diff --git a/state/requests.go b/state/requests.go index f743088652..498763b77c 100644 --- a/state/requests.go +++ b/state/requests.go @@ -14,6 +14,9 @@ limitations under the License. package state import ( + "fmt" + "strings" + "github.com/dapr/components-contrib/state/query" ) @@ -67,6 +70,21 @@ func (r DeleteRequest) Operation() OperationType { return OperationDelete } +// DeleteWithPrefixRequest is the object describing a delete with prefix state request used for deleting actors. +type DeleteWithPrefixRequest struct { + Prefix string `json:"prefix"` +} + +func (r *DeleteWithPrefixRequest) Validate() error { + if r.Prefix == "" { + return fmt.Errorf("a prefix is required for deleteWithPrefix request") + } + if !strings.HasSuffix(r.Prefix, "||") { + r.Prefix += "||" + } + return nil +} + // DeleteStateOption controls how a state store reacts to a delete request. type DeleteStateOption struct { Concurrency string `json:"concurrency,omitempty"` // "concurrency" diff --git a/state/responses.go b/state/responses.go index 28e56ffcf8..2cb7564564 100644 --- a/state/responses.go +++ b/state/responses.go @@ -52,3 +52,8 @@ type QueryItem struct { Error string `json:"error,omitempty"` ContentType *string `json:"contentType,omitempty"` } + +// DeleteWithPrefixResponse is the object representing a delete with prefix state response containing the number of items removed. +type DeleteWithPrefixResponse struct { + Count int64 `json:"count"` // count of items removed +} diff --git a/state/sqlite/sqlite.go b/state/sqlite/sqlite.go index a9ff2ee41a..226ce4bb94 100644 --- a/state/sqlite/sqlite.go +++ b/state/sqlite/sqlite.go @@ -49,6 +49,7 @@ func newSQLiteStateStore(logger logger.Logger, dba DBAccess) *SQLiteStore { state.FeatureETag, state.FeatureTransactional, state.FeatureTTL, + state.FeatureDeleteWithPrefix, }, dbaccess: dba, } @@ -100,6 +101,11 @@ func (s *SQLiteStore) Multi(ctx context.Context, request *state.TransactionalSta return s.dbaccess.ExecuteMulti(ctx, request.Operations) } +// DeleteWithPrefix deletes an actor's state +func (s *SQLiteStore) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + return s.dbaccess.DeleteWithPrefix(ctx, req) +} + // Close implements io.Closer. func (s *SQLiteStore) Close() error { if s.dbaccess != nil { diff --git a/state/sqlite/sqlite_dbaccess.go b/state/sqlite/sqlite_dbaccess.go index 1e45f0b3cf..8e4e579506 100644 --- a/state/sqlite/sqlite_dbaccess.go +++ b/state/sqlite/sqlite_dbaccess.go @@ -45,6 +45,7 @@ type DBAccess interface { Delete(ctx context.Context, req *state.DeleteRequest) error BulkGet(ctx context.Context, req []state.GetRequest) ([]state.BulkGetResponse, error) ExecuteMulti(ctx context.Context, reqs []state.TransactionalStateOperation) error + DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) Close() error } @@ -78,6 +79,7 @@ func (a *sqliteDBAccess) Init(ctx context.Context, md state.Metadata) error { return err } + registerFuntions() connString, err := a.metadata.GetConnectionString(a.logger, sqlite.GetConnectionStringOpts{}) if err != nil { // Already logged @@ -418,6 +420,34 @@ func (a *sqliteDBAccess) Delete(ctx context.Context, req *state.DeleteRequest) e return a.doDelete(ctx, a.db, req) } +func (a *sqliteDBAccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + if req.Prefix == "" { + return state.DeleteWithPrefixResponse{}, fmt.Errorf("missing prefix in delete with prefix operation") + } + ctx, cancel := context.WithTimeout(ctx, a.metadata.Timeout) + defer cancel() + + err := req.Validate() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + // Concatenation is required for table name because sql.DB does not substitute parameters for table names. + //nolint:gosec + result, err := a.db.ExecContext(ctx, "DELETE FROM "+a.metadata.TableName+" WHERE prefix = ?", + req.Prefix) + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + rows, err := result.RowsAffected() + if err != nil { + return state.DeleteWithPrefixResponse{}, err + } + + return state.DeleteWithPrefixResponse{Count: rows}, nil +} + func (a *sqliteDBAccess) ExecuteMulti(parentCtx context.Context, reqs []state.TransactionalStateOperation) error { tx, err := a.db.BeginTx(parentCtx, nil) if err != nil { diff --git a/state/sqlite/sqlite_integration_test.go b/state/sqlite/sqlite_integration_test.go index bb376e9796..53852be74a 100644 --- a/state/sqlite/sqlite_integration_test.go +++ b/state/sqlite/sqlite_integration_test.go @@ -140,6 +140,10 @@ func TestSqliteIntegration(t *testing.T) { multiWithSetOnly(t, s) }) + t.Run("Delete with Prefix (actor state)", func(t *testing.T) { + testDeleteWithPrefix(t, s) + }) + t.Run("ttlExpireTime", func(t *testing.T) { getExpireTime(t, s) getBulkExpireTime(t, s) @@ -610,6 +614,55 @@ func setItemWithNoKey(t *testing.T, s state.Store) { require.Error(t, err) } +func testDeleteWithPrefix(t *testing.T, s state.Store) { + setReq1 := &state.SetRequest{ + Key: "mock-app-id||mock-actor-type||mock-actor-id||key0", + } + + setReq2 := &state.SetRequest{ + Key: "mock-app-id||mock-actor-type||mock-actor-id||key1", + } + + setReq3 := &state.SetRequest{ + Key: "mock-app-id||mock-actor-type||mock-actor-id||key2", + } + + setReq4 := &state.SetRequest{ + Key: "different-app-id||different-actor-type||different-actor-id||key0", + } + + delReq := state.DeleteWithPrefixRequest{ + Prefix: "mock-app-id||mock-actor-type||mock-actor-id", + } + + err := s.Set(context.Background(), setReq1) + require.NoError(t, err) + + err = s.Set(context.Background(), setReq2) + require.NoError(t, err) + + err = s.Set(context.Background(), setReq3) + require.NoError(t, err) + + err = s.Set(context.Background(), setReq4) + require.NoError(t, err) + + res, err := s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq) + require.NoError(t, err) + assert.Equal(t, int64(3), res.Count) + + delReq = state.DeleteWithPrefixRequest{ + Prefix: "different-app-id||different-actor-type||different-actor-id||", + } + res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq) + require.NoError(t, err) + assert.Equal(t, int64(1), res.Count) + + res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq) + require.NoError(t, err) + assert.Equal(t, int64(0), res.Count) +} + func testSetItemWithInvalidTTL(t *testing.T, s state.Store) { setReq := &state.SetRequest{ Key: randomKey(), diff --git a/state/sqlite/sqlite_migrations.go b/state/sqlite/sqlite_migrations.go index 78de6c69a3..5bbdbcd7c6 100644 --- a/state/sqlite/sqlite_migrations.go +++ b/state/sqlite/sqlite_migrations.go @@ -16,11 +16,15 @@ package sqlite import ( "context" "database/sql" + "database/sql/driver" "fmt" + "strings" commonsql "github.com/dapr/components-contrib/common/component/sql" sqlitemigrations "github.com/dapr/components-contrib/common/component/sql/migrations/sqlite" "github.com/dapr/kit/logger" + + sqlite3 "modernc.org/sqlite" ) type migrationOptions struct { @@ -28,6 +32,31 @@ type migrationOptions struct { MetadataTableName string } +func registerFuntions() { + sqlite3.RegisterDeterministicScalarFunction( + "parse_key_prefix", + 1, + func(ctx *sqlite3.FunctionContext, args []driver.Value) (driver.Value, error) { + var s1 string + switch arg0 := args[0].(type) { + case string: + s1 = arg0 + default: + return "", fmt.Errorf("expected argv[0] to be text") + } + if len(s1) == 0 { + return "", fmt.Errorf("cannot create prefix for empty string") + } + + lastIndex := strings.LastIndex(s1, "||") + if lastIndex != -1 { + return s1[:lastIndex+2], nil + } + return "", nil + }, + ) +} + // Perform the required migrations func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, opts migrationOptions) error { m := sqlitemigrations.Migrations{ @@ -61,5 +90,23 @@ func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, op } return nil }, - }) + // Migration 1: add the "prefix" column + func(ctx context.Context) error { + // Add the "prefix" column that can be used by DeleteWithPrefix + logger.Infof("Adding 'prefix' column to table '%s'", opts.StateTableName) + _, err := m.GetConn().ExecContext( + ctx, + fmt.Sprintf( + `ALTER TABLE %[1]s ADD COLUMN prefix TEXT GENERATED ALWAYS AS (parse_key_prefix(key)) VIRTUAL; + CREATE INDEX %[1]s_prefix_index ON %[1]s(prefix) WHERE prefix != ""`, + opts.StateTableName, + ), + ) + if err != nil { + return fmt.Errorf("failed to create virtual column: %w", err) + } + return nil + }, + }, + ) } diff --git a/state/sqlite/sqlite_test.go b/state/sqlite/sqlite_test.go index 3ffcdc3f61..177fb6c4cc 100644 --- a/state/sqlite/sqlite_test.go +++ b/state/sqlite/sqlite_test.go @@ -294,6 +294,15 @@ func TestValidMultiDeleteRequest(t *testing.T) { require.NoError(t, err) } +func TestValidEmptyDeleteWithPrefixRequest(t *testing.T) { + t.Parallel() + + ods := createSqlite(t) + res, err := ods.DeleteWithPrefix(context.Background(), createDeleteWithPrefixRequest()) + require.NoError(t, err) + assert.Equal(t, int64(0), res.Count) +} + // Proves that the Ping method runs the ping method. func TestPingRunsDBAccessPing(t *testing.T) { t.Parallel() @@ -338,6 +347,10 @@ func (m *fakeDBaccess) BulkGet(parentCtx context.Context, req []state.GetRequest return nil, nil } +func (m *fakeDBaccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) { + return state.DeleteWithPrefixResponse{}, nil +} + func (m *fakeDBaccess) Delete(ctx context.Context, req *state.DeleteRequest) error { return nil } @@ -389,6 +402,12 @@ func createDeleteRequest() state.DeleteRequest { } } +func createDeleteWithPrefixRequest() state.DeleteWithPrefixRequest { + return state.DeleteWithPrefixRequest{ + Prefix: randomKey(), + } +} + func createSqliteWithFake(t *testing.T) (*SQLiteStore, *fakeDBaccess) { ods := createSqlite(t) fake := ods.dbaccess.(*fakeDBaccess) diff --git a/state/store.go b/state/store.go index 2bb7b39c57..ceb48fcd8e 100644 --- a/state/store.go +++ b/state/store.go @@ -61,3 +61,8 @@ func Ping(ctx context.Context, store Store) error { return errors.New("ping is not implemented by this state store") } } + +// DeleteWithPrefix is an interface to delete objects with a prefix. +type DeleteWithPrefix interface { + DeleteWithPrefix(ctx context.Context, req DeleteWithPrefixRequest) (DeleteWithPrefixResponse, error) +} diff --git a/tests/certification/state/sqlite/artifacts/readonly.db b/tests/certification/state/sqlite/artifacts/readonly.db index d2772f5217..7d6f81b517 100644 Binary files a/tests/certification/state/sqlite/artifacts/readonly.db and b/tests/certification/state/sqlite/artifacts/readonly.db differ diff --git a/tests/certification/state/sqlite/artifacts/update_readonlydb.go b/tests/certification/state/sqlite/artifacts/update_readonlydb.go new file mode 100644 index 0000000000..5bb13642e1 --- /dev/null +++ b/tests/certification/state/sqlite/artifacts/update_readonlydb.go @@ -0,0 +1,38 @@ +// This small utility can be used to update the "readonly.db" file when a new migration is added to the state store component. +package main + +import ( + "context" + + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/state" + state_sqlite "github.com/dapr/components-contrib/state/sqlite" + "github.com/dapr/kit/logger" +) + +func main() { + log := logger.NewLogger("updatedb") + log.SetOutputLevel(logger.DebugLevel) + + log.Info("Initializing the component to perform migrations") + store := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore) + err := store.Init(context.Background(), state.Metadata{ + Base: metadata.Base{ + Properties: map[string]string{ + "connectionString": "file:readonly.db", + "disableWAL": "true", + }, + }, + }) + if err != nil { + log.Fatalf("Failed to perform migrations: %v", err) + } + + log.Info("Vacuuming DB") + _, err = store.GetDBAccess().GetConnection().Exec("VACUUM") + if err != nil { + log.Fatalf("Failed to vacuum database: %v", err) + } + + log.Info("You are all set 😁") +} diff --git a/tests/certification/state/sqlite/sqlite_test.go b/tests/certification/state/sqlite/sqlite_test.go index 63dd50524b..4eaefa5712 100644 --- a/tests/certification/state/sqlite/sqlite_test.go +++ b/tests/certification/state/sqlite/sqlite_test.go @@ -63,7 +63,8 @@ const ( keyBusyTimeout = "busyTimeout" // Update this constant if you add more migrations - migrationLevel = "1" + // Don't forget to also run the utility `artifacts/update_readonlydb.go` to update the read-only DB + migrationLevel = "2" ) func TestSQLite(t *testing.T) {