Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions state/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
FeatureQueryAPI Feature = "QUERY_API"
// FeatureTTL is the feature that supports TTLs.
FeatureTTL Feature = "TTL"
// FeatureDeleteWithPrefix is the feature that supports deleting with prefix.
FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX"
)

// Feature names a feature that can be implemented by state store components.
Expand Down
18 changes: 18 additions & 0 deletions state/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ limitations under the License.
package state

import (
"fmt"
"strings"

"github.com/dapr/components-contrib/state/query"
)

Expand Down Expand Up @@ -67,6 +70,21 @@ func (r DeleteRequest) Operation() OperationType {
return OperationDelete
}

// DeleteWithPrefixRequest is the object describing a delete with prefix state request used for deleting actors.
type DeleteWithPrefixRequest struct {
Prefix string `json:"prefix"`
}

func (r *DeleteWithPrefixRequest) Validate() error {
if r.Prefix == "" {
return fmt.Errorf("a prefix is required for deleteWithPrefix request")
}
if !strings.HasSuffix(r.Prefix, "||") {
r.Prefix += "||"
}
return nil
}

// DeleteStateOption controls how a state store reacts to a delete request.
type DeleteStateOption struct {
Concurrency string `json:"concurrency,omitempty"` // "concurrency"
Expand Down
5 changes: 5 additions & 0 deletions state/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ type QueryItem struct {
Error string `json:"error,omitempty"`
ContentType *string `json:"contentType,omitempty"`
}

// DeleteWithPrefixResponse is the object representing a delete with prefix state response containing the number of items removed.
type DeleteWithPrefixResponse struct {
Count int64 `json:"count"` // count of items removed
}
6 changes: 6 additions & 0 deletions state/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func newSQLiteStateStore(logger logger.Logger, dba DBAccess) *SQLiteStore {
state.FeatureETag,
state.FeatureTransactional,
state.FeatureTTL,
state.FeatureDeleteWithPrefix,
},
dbaccess: dba,
}
Expand Down Expand Up @@ -100,6 +101,11 @@ func (s *SQLiteStore) Multi(ctx context.Context, request *state.TransactionalSta
return s.dbaccess.ExecuteMulti(ctx, request.Operations)
}

// DeleteWithPrefix deletes an actor's state
func (s *SQLiteStore) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
return s.dbaccess.DeleteWithPrefix(ctx, req)
}

// Close implements io.Closer.
func (s *SQLiteStore) Close() error {
if s.dbaccess != nil {
Expand Down
30 changes: 30 additions & 0 deletions state/sqlite/sqlite_dbaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type DBAccess interface {
Delete(ctx context.Context, req *state.DeleteRequest) error
BulkGet(ctx context.Context, req []state.GetRequest) ([]state.BulkGetResponse, error)
ExecuteMulti(ctx context.Context, reqs []state.TransactionalStateOperation) error
DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error)
Close() error
}

Expand Down Expand Up @@ -78,6 +79,7 @@ func (a *sqliteDBAccess) Init(ctx context.Context, md state.Metadata) error {
return err
}

registerFuntions()
connString, err := a.metadata.GetConnectionString(a.logger, sqlite.GetConnectionStringOpts{})
if err != nil {
// Already logged
Expand Down Expand Up @@ -418,6 +420,34 @@ func (a *sqliteDBAccess) Delete(ctx context.Context, req *state.DeleteRequest) e
return a.doDelete(ctx, a.db, req)
}

func (a *sqliteDBAccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
if req.Prefix == "" {
return state.DeleteWithPrefixResponse{}, fmt.Errorf("missing prefix in delete with prefix operation")
}
ctx, cancel := context.WithTimeout(ctx, a.metadata.Timeout)
defer cancel()

err := req.Validate()
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

// Concatenation is required for table name because sql.DB does not substitute parameters for table names.
//nolint:gosec
result, err := a.db.ExecContext(ctx, "DELETE FROM "+a.metadata.TableName+" WHERE prefix = ?",
req.Prefix)
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

rows, err := result.RowsAffected()
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

return state.DeleteWithPrefixResponse{Count: rows}, nil
}

func (a *sqliteDBAccess) ExecuteMulti(parentCtx context.Context, reqs []state.TransactionalStateOperation) error {
tx, err := a.db.BeginTx(parentCtx, nil)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions state/sqlite/sqlite_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func TestSqliteIntegration(t *testing.T) {
multiWithSetOnly(t, s)
})

t.Run("Delete with Prefix (actor state)", func(t *testing.T) {
testDeleteWithPrefix(t, s)
})

t.Run("ttlExpireTime", func(t *testing.T) {
getExpireTime(t, s)
getBulkExpireTime(t, s)
Expand Down Expand Up @@ -610,6 +614,55 @@ func setItemWithNoKey(t *testing.T, s state.Store) {
require.Error(t, err)
}

func testDeleteWithPrefix(t *testing.T, s state.Store) {
setReq1 := &state.SetRequest{
Key: "mock-app-id||mock-actor-type||mock-actor-id||key0",
}

setReq2 := &state.SetRequest{
Key: "mock-app-id||mock-actor-type||mock-actor-id||key1",
}

setReq3 := &state.SetRequest{
Key: "mock-app-id||mock-actor-type||mock-actor-id||key2",
}

setReq4 := &state.SetRequest{
Key: "different-app-id||different-actor-type||different-actor-id||key0",
}

delReq := state.DeleteWithPrefixRequest{
Prefix: "mock-app-id||mock-actor-type||mock-actor-id",
}

err := s.Set(context.Background(), setReq1)
require.NoError(t, err)

err = s.Set(context.Background(), setReq2)
require.NoError(t, err)

err = s.Set(context.Background(), setReq3)
require.NoError(t, err)

err = s.Set(context.Background(), setReq4)
require.NoError(t, err)

res, err := s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
require.NoError(t, err)
assert.Equal(t, int64(3), res.Count)

delReq = state.DeleteWithPrefixRequest{
Prefix: "different-app-id||different-actor-type||different-actor-id||",
}
res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
require.NoError(t, err)
assert.Equal(t, int64(1), res.Count)

res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
require.NoError(t, err)
assert.Equal(t, int64(0), res.Count)
}

func testSetItemWithInvalidTTL(t *testing.T, s state.Store) {
setReq := &state.SetRequest{
Key: randomKey(),
Expand Down
49 changes: 48 additions & 1 deletion state/sqlite/sqlite_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,47 @@ package sqlite
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"strings"

commonsql "github.com/dapr/components-contrib/common/component/sql"
sqlitemigrations "github.com/dapr/components-contrib/common/component/sql/migrations/sqlite"
"github.com/dapr/kit/logger"

sqlite3 "modernc.org/sqlite"
)

type migrationOptions struct {
StateTableName string
MetadataTableName string
}

func registerFuntions() {
sqlite3.RegisterDeterministicScalarFunction(
"parse_key_prefix",
1,
func(ctx *sqlite3.FunctionContext, args []driver.Value) (driver.Value, error) {
var s1 string
switch arg0 := args[0].(type) {
case string:
s1 = arg0
default:
return "", fmt.Errorf("expected argv[0] to be text")
}
if len(s1) == 0 {
return "", fmt.Errorf("cannot create prefix for empty string")
}

lastIndex := strings.LastIndex(s1, "||")
if lastIndex != -1 {
return s1[:lastIndex+2], nil
}
return "", nil
},
)
}

// Perform the required migrations
func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, opts migrationOptions) error {
m := sqlitemigrations.Migrations{
Expand Down Expand Up @@ -61,5 +90,23 @@ func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, op
}
return nil
},
})
// Migration 1: add the "prefix" column
func(ctx context.Context) error {
// We add this virtual prefix column to enable us to delete an actor's state since this prefix will tell us which actor created it
logger.Infof("Creating virtual column for table '%s'", opts.StateTableName)
_, err := m.GetConn().ExecContext(
ctx,
fmt.Sprintf(
`ALTER TABLE %[1]s ADD COLUMN prefix TEXT GENERATED ALWAYS AS (parse_key_prefix(key)) VIRTUAL;
CREATE INDEX %[1]s_prefix_index ON %[1]s(prefix) WHERE prefix != ""`,
opts.StateTableName,
),
)
if err != nil {
return fmt.Errorf("failed to create state table: %w", err)
}
return nil
},
},
)
}
19 changes: 19 additions & 0 deletions state/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ func TestValidMultiDeleteRequest(t *testing.T) {
require.NoError(t, err)
}

func TestValidEmptyDeleteWithPrefixRequest(t *testing.T) {
t.Parallel()

ods := createSqlite(t)
res, err := ods.DeleteWithPrefix(context.Background(), createDeleteWithPrefixRequest())
require.NoError(t, err)
assert.Equal(t, int64(0), res.Count)
}

// Proves that the Ping method runs the ping method.
func TestPingRunsDBAccessPing(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -338,6 +347,10 @@ func (m *fakeDBaccess) BulkGet(parentCtx context.Context, req []state.GetRequest
return nil, nil
}

func (m *fakeDBaccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
return state.DeleteWithPrefixResponse{}, nil
}

func (m *fakeDBaccess) Delete(ctx context.Context, req *state.DeleteRequest) error {
return nil
}
Expand Down Expand Up @@ -389,6 +402,12 @@ func createDeleteRequest() state.DeleteRequest {
}
}

func createDeleteWithPrefixRequest() state.DeleteWithPrefixRequest {
return state.DeleteWithPrefixRequest{
Prefix: randomKey(),
}
}

func createSqliteWithFake(t *testing.T) (*SQLiteStore, *fakeDBaccess) {
ods := createSqlite(t)
fake := ods.dbaccess.(*fakeDBaccess)
Expand Down
5 changes: 5 additions & 0 deletions state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,8 @@ func Ping(ctx context.Context, store Store) error {
return errors.New("ping is not implemented by this state store")
}
}

// DeleteWithPrefix is an interface to delete objects with a prefix.
type DeleteWithPrefix interface {
DeleteWithPrefix(ctx context.Context, req DeleteWithPrefixRequest) (DeleteWithPrefixResponse, error)
}