Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions state/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
FeatureQueryAPI Feature = "QUERY_API"
// FeatureTTL is the feature that supports TTLs.
FeatureTTL Feature = "TTL"
// FeatureDeleteWithPrefix is the feature that supports deleting with prefix.
FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX"
)

// Feature names a feature that can be implemented by state store components.
Expand Down
18 changes: 18 additions & 0 deletions state/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ limitations under the License.
package state

import (
"fmt"
"strings"

"github.com/dapr/components-contrib/state/query"
)

Expand Down Expand Up @@ -67,6 +70,21 @@ func (r DeleteRequest) Operation() OperationType {
return OperationDelete
}

// DeleteWithPrefixRequest is the object describing a delete with prefix state request used for deleting actors.
type DeleteWithPrefixRequest struct {
Prefix string `json:"prefix"`
}

func (r *DeleteWithPrefixRequest) Validate() error {
if r.Prefix == "" {
return fmt.Errorf("a prefix is required for deleteWithPrefix request")
}
if !strings.HasSuffix(r.Prefix, "||") {
r.Prefix += "||"
}
return nil
}

// DeleteStateOption controls how a state store reacts to a delete request.
type DeleteStateOption struct {
Concurrency string `json:"concurrency,omitempty"` // "concurrency"
Expand Down
5 changes: 5 additions & 0 deletions state/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ type QueryItem struct {
Error string `json:"error,omitempty"`
ContentType *string `json:"contentType,omitempty"`
}

// DeleteWithPrefixResponse is the object representing a delete with prefix state response containing the number of items removed.
type DeleteWithPrefixResponse struct {
Count int64 `json:"count"` // count of items removed
}
6 changes: 6 additions & 0 deletions state/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func newSQLiteStateStore(logger logger.Logger, dba DBAccess) *SQLiteStore {
state.FeatureETag,
state.FeatureTransactional,
state.FeatureTTL,
state.FeatureDeleteWithPrefix,
},
dbaccess: dba,
}
Expand Down Expand Up @@ -100,6 +101,11 @@ func (s *SQLiteStore) Multi(ctx context.Context, request *state.TransactionalSta
return s.dbaccess.ExecuteMulti(ctx, request.Operations)
}

// DeleteWithPrefix deletes an actor's state
func (s *SQLiteStore) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
return s.dbaccess.DeleteWithPrefix(ctx, req)
}

// Close implements io.Closer.
func (s *SQLiteStore) Close() error {
if s.dbaccess != nil {
Expand Down
30 changes: 30 additions & 0 deletions state/sqlite/sqlite_dbaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type DBAccess interface {
Delete(ctx context.Context, req *state.DeleteRequest) error
BulkGet(ctx context.Context, req []state.GetRequest) ([]state.BulkGetResponse, error)
ExecuteMulti(ctx context.Context, reqs []state.TransactionalStateOperation) error
DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error)
Close() error
}

Expand Down Expand Up @@ -78,6 +79,7 @@ func (a *sqliteDBAccess) Init(ctx context.Context, md state.Metadata) error {
return err
}

registerFuntions()
connString, err := a.metadata.GetConnectionString(a.logger, sqlite.GetConnectionStringOpts{})
if err != nil {
// Already logged
Expand Down Expand Up @@ -418,6 +420,34 @@ func (a *sqliteDBAccess) Delete(ctx context.Context, req *state.DeleteRequest) e
return a.doDelete(ctx, a.db, req)
}

func (a *sqliteDBAccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
if req.Prefix == "" {
return state.DeleteWithPrefixResponse{}, fmt.Errorf("missing prefix in delete with prefix operation")
}
ctx, cancel := context.WithTimeout(ctx, a.metadata.Timeout)
defer cancel()

err := req.Validate()
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

// Concatenation is required for table name because sql.DB does not substitute parameters for table names.
//nolint:gosec
result, err := a.db.ExecContext(ctx, "DELETE FROM "+a.metadata.TableName+" WHERE prefix = ?",
req.Prefix)
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

rows, err := result.RowsAffected()
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}

return state.DeleteWithPrefixResponse{Count: rows}, nil
}

func (a *sqliteDBAccess) ExecuteMulti(parentCtx context.Context, reqs []state.TransactionalStateOperation) error {
tx, err := a.db.BeginTx(parentCtx, nil)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions state/sqlite/sqlite_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func TestSqliteIntegration(t *testing.T) {
multiWithSetOnly(t, s)
})

t.Run("Delete with Prefix (actor state)", func(t *testing.T) {
testDeleteWithPrefix(t, s)
})

t.Run("ttlExpireTime", func(t *testing.T) {
getExpireTime(t, s)
getBulkExpireTime(t, s)
Expand Down Expand Up @@ -610,6 +614,55 @@ func setItemWithNoKey(t *testing.T, s state.Store) {
require.Error(t, err)
}

func testDeleteWithPrefix(t *testing.T, s state.Store) {
setReq1 := &state.SetRequest{
Key: "mock-app-id||mock-actor-type||mock-actor-id||key0",
}

setReq2 := &state.SetRequest{
Key: "mock-app-id||mock-actor-type||mock-actor-id||key1",
}

setReq3 := &state.SetRequest{
Key: "mock-app-id||mock-actor-type||mock-actor-id||key2",
}

setReq4 := &state.SetRequest{
Key: "different-app-id||different-actor-type||different-actor-id||key0",
}

delReq := state.DeleteWithPrefixRequest{
Prefix: "mock-app-id||mock-actor-type||mock-actor-id",
}

err := s.Set(context.Background(), setReq1)
require.NoError(t, err)

err = s.Set(context.Background(), setReq2)
require.NoError(t, err)

err = s.Set(context.Background(), setReq3)
require.NoError(t, err)

err = s.Set(context.Background(), setReq4)
require.NoError(t, err)

res, err := s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
require.NoError(t, err)
assert.Equal(t, int64(3), res.Count)

delReq = state.DeleteWithPrefixRequest{
Prefix: "different-app-id||different-actor-type||different-actor-id||",
}
res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
require.NoError(t, err)
assert.Equal(t, int64(1), res.Count)

res, err = s.(state.DeleteWithPrefix).DeleteWithPrefix(context.Background(), delReq)
require.NoError(t, err)
assert.Equal(t, int64(0), res.Count)
}

func testSetItemWithInvalidTTL(t *testing.T, s state.Store) {
setReq := &state.SetRequest{
Key: randomKey(),
Expand Down
49 changes: 48 additions & 1 deletion state/sqlite/sqlite_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,47 @@ package sqlite
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"strings"

commonsql "github.com/dapr/components-contrib/common/component/sql"
sqlitemigrations "github.com/dapr/components-contrib/common/component/sql/migrations/sqlite"
"github.com/dapr/kit/logger"

sqlite3 "modernc.org/sqlite"
)

type migrationOptions struct {
StateTableName string
MetadataTableName string
}

func registerFuntions() {
sqlite3.RegisterDeterministicScalarFunction(
"parse_key_prefix",
1,
func(ctx *sqlite3.FunctionContext, args []driver.Value) (driver.Value, error) {
var s1 string
switch arg0 := args[0].(type) {
case string:
s1 = arg0
default:
return "", fmt.Errorf("expected argv[0] to be text")
}
if len(s1) == 0 {
return "", fmt.Errorf("cannot create prefix for empty string")
}

lastIndex := strings.LastIndex(s1, "||")
if lastIndex != -1 {
return s1[:lastIndex+2], nil
}
return "", nil
},
)
}

// Perform the required migrations
func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, opts migrationOptions) error {
m := sqlitemigrations.Migrations{
Expand Down Expand Up @@ -61,5 +90,23 @@ func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, op
}
return nil
},
})
// Migration 1: add the "prefix" column
func(ctx context.Context) error {
// Add the "prefix" column that can be used by DeleteWithPrefix
logger.Infof("Adding 'prefix' column to table '%s'", opts.StateTableName)
_, err := m.GetConn().ExecContext(
ctx,
fmt.Sprintf(
`ALTER TABLE %[1]s ADD COLUMN prefix TEXT GENERATED ALWAYS AS (parse_key_prefix(key)) VIRTUAL;
CREATE INDEX %[1]s_prefix_index ON %[1]s(prefix) WHERE prefix != ""`,
opts.StateTableName,
),
)
if err != nil {
return fmt.Errorf("failed to create virtual column: %w", err)
}
return nil
},
},
)
}
19 changes: 19 additions & 0 deletions state/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ func TestValidMultiDeleteRequest(t *testing.T) {
require.NoError(t, err)
}

func TestValidEmptyDeleteWithPrefixRequest(t *testing.T) {
t.Parallel()

ods := createSqlite(t)
res, err := ods.DeleteWithPrefix(context.Background(), createDeleteWithPrefixRequest())
require.NoError(t, err)
assert.Equal(t, int64(0), res.Count)
}

// Proves that the Ping method runs the ping method.
func TestPingRunsDBAccessPing(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -338,6 +347,10 @@ func (m *fakeDBaccess) BulkGet(parentCtx context.Context, req []state.GetRequest
return nil, nil
}

func (m *fakeDBaccess) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
return state.DeleteWithPrefixResponse{}, nil
}

func (m *fakeDBaccess) Delete(ctx context.Context, req *state.DeleteRequest) error {
return nil
}
Expand Down Expand Up @@ -389,6 +402,12 @@ func createDeleteRequest() state.DeleteRequest {
}
}

func createDeleteWithPrefixRequest() state.DeleteWithPrefixRequest {
return state.DeleteWithPrefixRequest{
Prefix: randomKey(),
}
}

func createSqliteWithFake(t *testing.T) (*SQLiteStore, *fakeDBaccess) {
ods := createSqlite(t)
fake := ods.dbaccess.(*fakeDBaccess)
Expand Down
5 changes: 5 additions & 0 deletions state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,8 @@ func Ping(ctx context.Context, store Store) error {
return errors.New("ping is not implemented by this state store")
}
}

// DeleteWithPrefix is an interface to delete objects with a prefix.
type DeleteWithPrefix interface {
DeleteWithPrefix(ctx context.Context, req DeleteWithPrefixRequest) (DeleteWithPrefixResponse, error)
}
Binary file modified tests/certification/state/sqlite/artifacts/readonly.db
Binary file not shown.
38 changes: 38 additions & 0 deletions tests/certification/state/sqlite/artifacts/update_readonlydb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// This small utility can be used to update the "readonly.db" file when a new migration is added to the state store component.
package main

import (
"context"

"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
state_sqlite "github.com/dapr/components-contrib/state/sqlite"
"github.com/dapr/kit/logger"
)

func main() {
log := logger.NewLogger("updatedb")
log.SetOutputLevel(logger.DebugLevel)

log.Info("Initializing the component to perform migrations")
store := state_sqlite.NewSQLiteStateStore(log).(*state_sqlite.SQLiteStore)
err := store.Init(context.Background(), state.Metadata{
Base: metadata.Base{
Properties: map[string]string{
"connectionString": "file:readonly.db",
"disableWAL": "true",
},
},
})
if err != nil {
log.Fatalf("Failed to perform migrations: %v", err)
}

log.Info("Vacuuming DB")
_, err = store.GetDBAccess().GetConnection().Exec("VACUUM")
if err != nil {
log.Fatalf("Failed to vacuum database: %v", err)
}

log.Info("You are all set 😁")
}
3 changes: 2 additions & 1 deletion tests/certification/state/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ const (
keyBusyTimeout = "busyTimeout"

// Update this constant if you add more migrations
migrationLevel = "1"
// Don't forget to also run the utility `artifacts/update_readonlydb.go` to update the read-only DB
migrationLevel = "2"
)

func TestSQLite(t *testing.T) {
Expand Down